spark git commit: [SPARK-15703][SCHEDULER][CORE][WEBUI] Make ListenerBus event queue size configurable (branch 2.0)
Repository: spark Updated Branches: refs/heads/branch-2.0 5bc5b49fa -> 9e91a1009 [SPARK-15703][SCHEDULER][CORE][WEBUI] Make ListenerBus event queue size configurable (branch 2.0) ## What changes were proposed in this pull request? Backport #14269 to 2.0. ## How was this patch tested? Jenkins. Author: Dhruve AsharCloses #15222 from zsxwing/SPARK-15703-2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e91a100 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e91a100 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e91a100 Branch: refs/heads/branch-2.0 Commit: 9e91a1009e6f916245b4d4018de1664ea3decfe7 Parents: 5bc5b49 Author: Dhruve Ashar Authored: Fri Sep 23 14:59:53 2016 -0700 Committer: Shixiong Zhu Committed: Fri Sep 23 14:59:53 2016 -0700 -- .../scala/org/apache/spark/SparkContext.scala | 4 +-- .../apache/spark/internal/config/package.scala | 5 .../spark/scheduler/LiveListenerBus.scala | 23 +-- .../scheduler/EventLoggingListenerSuite.scala | 4 +-- .../spark/scheduler/SparkListenerSuite.scala| 30 +++- .../storage/BlockManagerReplicationSuite.scala | 9 -- .../spark/storage/BlockManagerSuite.scala | 6 ++-- .../spark/ui/storage/StorageTabSuite.scala | 11 +++ .../streaming/ReceivedBlockHandlerSuite.scala | 5 +++- 9 files changed, 60 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 251c16f..ffd1227 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -249,7 +249,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def isStopped: Boolean = stopped.get() // An asynchronous listener bus for Spark events - private[spark] val listenerBus = new LiveListenerBus + private[spark] val listenerBus = new LiveListenerBus(this) // This function allows components created by SparkEnv to be mocked in unit tests: private[spark] def createSparkEnv( @@ -2154,7 +2154,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } } -listenerBus.start(this) +listenerBus.start() _listenerBusStarted = true } http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/main/scala/org/apache/spark/internal/config/package.scala -- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index f28a9a5..29f812a 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -112,4 +112,9 @@ package object config { // To limit how many applications are shown in the History Server summary ui private[spark] val HISTORY_UI_MAX_APPS = ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE) + + private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE = +ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size") + .intConf + .createWithDefault(1) } http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 1c21313..bfa3c40 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -22,7 +22,8 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.util.DynamicVariable -import org.apache.spark.SparkContext +import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.internal.config._ import org.apache.spark.util.Utils /** @@ -32,18 +33,24 @@ import org.apache.spark.util.Utils * has started will events be actually propagated to all attached listeners. This listener bus * is stopped when `stop()` is called, and it will drop further events after stopping. */ -private[spark] class LiveListenerBus extends SparkListenerBus { +private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends
[1/2] spark git commit: Preparing Spark release v2.0.1-rc3
Repository: spark Updated Branches: refs/heads/branch-2.0 b111a81f2 -> 5bc5b49fa Preparing Spark release v2.0.1-rc3 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9d28cc10 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9d28cc10 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9d28cc10 Branch: refs/heads/branch-2.0 Commit: 9d28cc10357a8afcfb2fa2e6eecb5c2cc2730d17 Parents: b111a81 Author: Patrick WendellAuthored: Fri Sep 23 14:38:07 2016 -0700 Committer: Patrick Wendell Committed: Fri Sep 23 14:38:07 2016 -0700 -- R/pkg/DESCRIPTION | 2 +- assembly/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/java8-tests/pom.xml | 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- launcher/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 35 files changed, 35 insertions(+), 35 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9d28cc10/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 5a83883..3e49eac 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -1,7 +1,7 @@ Package: SparkR Type: Package Title: R Frontend for Apache Spark -Version: 2.0.0 +Version: 2.0.1 Date: 2016-08-27 Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"), email = "shiva...@cs.berkeley.edu"), http://git-wip-us.apache.org/repos/asf/spark/blob/9d28cc10/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index ca6daa2..6db3a59 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.0.2-SNAPSHOT +2.0.1 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/9d28cc10/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index c727f54..269b845 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.2-SNAPSHOT +2.0.1 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/9d28cc10/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index e335a89..20cf29e 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.2-SNAPSHOT +2.0.1 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/9d28cc10/common/network-yarn/pom.xml -- diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index 8e64f56..25cc328 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.2-SNAPSHOT +2.0.1 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/9d28cc10/common/sketch/pom.xml
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v2.0.1-rc3 [created] 9d28cc103 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/2] spark git commit: Preparing development version 2.0.2-SNAPSHOT
Preparing development version 2.0.2-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5bc5b49f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5bc5b49f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5bc5b49f Branch: refs/heads/branch-2.0 Commit: 5bc5b49fa0a5f3d395457aceff268938317f3718 Parents: 9d28cc1 Author: Patrick WendellAuthored: Fri Sep 23 14:38:13 2016 -0700 Committer: Patrick Wendell Committed: Fri Sep 23 14:38:13 2016 -0700 -- R/pkg/DESCRIPTION | 2 +- assembly/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 4 ++-- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/java8-tests/pom.xml | 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- launcher/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 36 files changed, 37 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5bc5b49f/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 3e49eac..dfb7e22 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -1,7 +1,7 @@ Package: SparkR Type: Package Title: R Frontend for Apache Spark -Version: 2.0.1 +Version: 2.0.2 Date: 2016-08-27 Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"), email = "shiva...@cs.berkeley.edu"), http://git-wip-us.apache.org/repos/asf/spark/blob/5bc5b49f/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 6db3a59..ca6daa2 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.0.1 +2.0.2-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/5bc5b49f/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index 269b845..c727f54 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.1 +2.0.2-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/5bc5b49f/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index 20cf29e..e335a89 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.1 +2.0.2-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/5bc5b49f/common/network-yarn/pom.xml -- diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index 25cc328..8e64f56 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.1 +2.0.2-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/5bc5b49f/common/sketch/pom.xml
spark git commit: [SPARK-17651][SPARKR] Set R package version number along with mvn
Repository: spark Updated Branches: refs/heads/branch-2.0 452e468f2 -> b111a81f2 [SPARK-17651][SPARKR] Set R package version number along with mvn This PR sets the R package version while tagging releases. Note that since R doesn't accept `-SNAPSHOT` in version number field, we remove that while setting the next version Tested manually by running locally Author: Shivaram VenkataramanCloses #15223 from shivaram/sparkr-version-change. (cherry picked from commit 7c382524a959a2bc9b3d2fca44f6f0b41aba4e3c) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b111a81f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b111a81f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b111a81f Branch: refs/heads/branch-2.0 Commit: b111a81f2a5547e2357d66db4ba2f05ce69a52a6 Parents: 452e468 Author: Shivaram Venkataraman Authored: Fri Sep 23 14:35:18 2016 -0700 Committer: Reynold Xin Committed: Fri Sep 23 14:36:01 2016 -0700 -- dev/create-release/release-tag.sh | 15 +++ 1 file changed, 15 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b111a81f/dev/create-release/release-tag.sh -- diff --git a/dev/create-release/release-tag.sh b/dev/create-release/release-tag.sh index d404939..b7e5100 100755 --- a/dev/create-release/release-tag.sh +++ b/dev/create-release/release-tag.sh @@ -60,12 +60,27 @@ git config user.email $GIT_EMAIL # Create release version $MVN versions:set -DnewVersion=$RELEASE_VERSION | grep -v "no value" # silence logs +# Set the release version in R/pkg/DESCRIPTION +sed -i".tmp1" 's/Version.*$/Version: '"$RELEASE_VERSION"'/g' R/pkg/DESCRIPTION +# Set the release version in docs +sed -i".tmp1" 's/SPARK_VERSION:.*$/SPARK_VERSION: '"$RELEASE_VERSION"'/g' docs/_config.yml +sed -i".tmp2" 's/SPARK_VERSION_SHORT:.*$/SPARK_VERSION_SHORT: '"$RELEASE_VERSION"'/g' docs/_config.yml + git commit -a -m "Preparing Spark release $RELEASE_TAG" echo "Creating tag $RELEASE_TAG at the head of $GIT_BRANCH" git tag $RELEASE_TAG # Create next version $MVN versions:set -DnewVersion=$NEXT_VERSION | grep -v "no value" # silence logs +# Remove -SNAPSHOT before setting the R version as R expects version strings to only have numbers +R_NEXT_VERSION=`echo $NEXT_VERSION | sed 's/-SNAPSHOT//g'` +sed -i".tmp2" 's/Version.*$/Version: '"$R_NEXT_VERSION"'/g' R/pkg/DESCRIPTION + +# Update docs with next version +sed -i".tmp3" 's/SPARK_VERSION:.*$/SPARK_VERSION: '"$NEXT_VERSION"'/g' docs/_config.yml +# Use R version for short version +sed -i".tmp4" 's/SPARK_VERSION_SHORT:.*$/SPARK_VERSION_SHORT: '"$R_NEXT_VERSION"'/g' docs/_config.yml + git commit -a -m "Preparing development version $NEXT_VERSION" # Push changes - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17651][SPARKR] Set R package version number along with mvn
Repository: spark Updated Branches: refs/heads/master 90a30f463 -> 7c382524a [SPARK-17651][SPARKR] Set R package version number along with mvn ## What changes were proposed in this pull request? This PR sets the R package version while tagging releases. Note that since R doesn't accept `-SNAPSHOT` in version number field, we remove that while setting the next version ## How was this patch tested? Tested manually by running locally Author: Shivaram VenkataramanCloses #15223 from shivaram/sparkr-version-change. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c382524 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c382524 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c382524 Branch: refs/heads/master Commit: 7c382524a959a2bc9b3d2fca44f6f0b41aba4e3c Parents: 90a30f4 Author: Shivaram Venkataraman Authored: Fri Sep 23 14:35:18 2016 -0700 Committer: Reynold Xin Committed: Fri Sep 23 14:35:18 2016 -0700 -- dev/create-release/release-tag.sh | 15 +++ 1 file changed, 15 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7c382524/dev/create-release/release-tag.sh -- diff --git a/dev/create-release/release-tag.sh b/dev/create-release/release-tag.sh index d404939..b7e5100 100755 --- a/dev/create-release/release-tag.sh +++ b/dev/create-release/release-tag.sh @@ -60,12 +60,27 @@ git config user.email $GIT_EMAIL # Create release version $MVN versions:set -DnewVersion=$RELEASE_VERSION | grep -v "no value" # silence logs +# Set the release version in R/pkg/DESCRIPTION +sed -i".tmp1" 's/Version.*$/Version: '"$RELEASE_VERSION"'/g' R/pkg/DESCRIPTION +# Set the release version in docs +sed -i".tmp1" 's/SPARK_VERSION:.*$/SPARK_VERSION: '"$RELEASE_VERSION"'/g' docs/_config.yml +sed -i".tmp2" 's/SPARK_VERSION_SHORT:.*$/SPARK_VERSION_SHORT: '"$RELEASE_VERSION"'/g' docs/_config.yml + git commit -a -m "Preparing Spark release $RELEASE_TAG" echo "Creating tag $RELEASE_TAG at the head of $GIT_BRANCH" git tag $RELEASE_TAG # Create next version $MVN versions:set -DnewVersion=$NEXT_VERSION | grep -v "no value" # silence logs +# Remove -SNAPSHOT before setting the R version as R expects version strings to only have numbers +R_NEXT_VERSION=`echo $NEXT_VERSION | sed 's/-SNAPSHOT//g'` +sed -i".tmp2" 's/Version.*$/Version: '"$R_NEXT_VERSION"'/g' R/pkg/DESCRIPTION + +# Update docs with next version +sed -i".tmp3" 's/SPARK_VERSION:.*$/SPARK_VERSION: '"$NEXT_VERSION"'/g' docs/_config.yml +# Use R version for short version +sed -i".tmp4" 's/SPARK_VERSION_SHORT:.*$/SPARK_VERSION_SHORT: '"$R_NEXT_VERSION"'/g' docs/_config.yml + git commit -a -m "Preparing development version $NEXT_VERSION" # Push changes - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: [SPARK-12221] add cpu time to metrics
Repository: spark Updated Branches: refs/heads/master 988c71457 -> 90a30f463 http://git-wip-us.apache.org/repos/asf/spark/blob/90a30f46/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 00314ab..d5146d7 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -606,6 +606,9 @@ private[spark] object JsonProtocolSuite extends Assertions { private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) { assert(metrics1.executorDeserializeTime === metrics2.executorDeserializeTime) +assert(metrics1.executorDeserializeCpuTime === metrics2.executorDeserializeCpuTime) +assert(metrics1.executorRunTime === metrics2.executorRunTime) +assert(metrics1.executorCpuTime === metrics2.executorCpuTime) assert(metrics1.resultSize === metrics2.resultSize) assert(metrics1.jvmGCTime === metrics2.jvmGCTime) assert(metrics1.resultSerializationTime === metrics2.resultSerializationTime) @@ -816,8 +819,11 @@ private[spark] object JsonProtocolSuite extends Assertions { hasOutput: Boolean, hasRecords: Boolean = true) = { val t = TaskMetrics.empty +// Set CPU times same as wall times for testing purpose t.setExecutorDeserializeTime(a) +t.setExecutorDeserializeCpuTime(a) t.setExecutorRunTime(b) +t.setExecutorCpuTime(b) t.setResultSize(c) t.setJvmGCTime(d) t.setResultSerializationTime(a + b) @@ -1097,7 +1103,9 @@ private[spark] object JsonProtocolSuite extends Assertions { | }, | "Task Metrics": { |"Executor Deserialize Time": 300, + |"Executor Deserialize CPU Time": 300, |"Executor Run Time": 400, + |"Executor CPU Time": 400, |"Result Size": 500, |"JVM GC Time": 600, |"Result Serialization Time": 700, @@ -1195,7 +1203,9 @@ private[spark] object JsonProtocolSuite extends Assertions { | }, | "Task Metrics": { |"Executor Deserialize Time": 300, + |"Executor Deserialize CPU Time": 300, |"Executor Run Time": 400, + |"Executor CPU Time": 400, |"Result Size": 500, |"JVM GC Time": 600, |"Result Serialization Time": 700, @@ -1293,7 +1303,9 @@ private[spark] object JsonProtocolSuite extends Assertions { | }, | "Task Metrics": { |"Executor Deserialize Time": 300, + |"Executor Deserialize CPU Time": 300, |"Executor Run Time": 400, + |"Executor CPU Time": 400, |"Result Size": 500, |"JVM GC Time": 600, |"Result Serialization Time": 700, @@ -1785,55 +1797,70 @@ private[spark] object JsonProtocolSuite extends Assertions { |}, |{ | "ID": 1, + | "Name": "$EXECUTOR_DESERIALIZE_CPU_TIME", + | "Update": 300, + | "Internal": true, + | "Count Failed Values": true + |}, + | + |{ + | "ID": 2, | "Name": "$EXECUTOR_RUN_TIME", | "Update": 400, | "Internal": true, | "Count Failed Values": true |}, |{ - | "ID": 2, + | "ID": 3, + | "Name": "$EXECUTOR_CPU_TIME", + | "Update": 400, + | "Internal": true, + | "Count Failed Values": true + |}, + |{ + | "ID": 4, | "Name": "$RESULT_SIZE", | "Update": 500, | "Internal": true, | "Count Failed Values": true |}, |{ - | "ID": 3, + | "ID": 5, | "Name": "$JVM_GC_TIME", | "Update": 600, | "Internal": true, | "Count Failed Values": true |}, |{ - | "ID": 4, + | "ID": 6, | "Name": "$RESULT_SERIALIZATION_TIME", | "Update": 700, | "Internal": true, | "Count Failed Values": true |}, |{ - | "ID": 5, + | "ID": 7, | "Name": "$MEMORY_BYTES_SPILLED", | "Update": 800, | "Internal": true, | "Count Failed Values": true |}, |{ - | "ID": 6, + | "ID": 8, | "Name": "$DISK_BYTES_SPILLED", | "Update": 0, | "Internal": true, | "Count Failed Values":
[2/2] spark git commit: [SPARK-12221] add cpu time to metrics
[SPARK-12221] add cpu time to metrics Currently task metrics don't support executor CPU time, so there's no way to calculate how much CPU time a stage/task took from History Server metrics. This PR enables reporting CPU time. Author: jisookimCloses #10212 from jisookim0513/add-cpu-time-metric. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90a30f46 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90a30f46 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90a30f46 Branch: refs/heads/master Commit: 90a30f46349182b6fc9d4123090c4712fdb425be Parents: 988c714 Author: jisookim Authored: Fri Sep 23 13:43:47 2016 -0700 Committer: Marcelo Vanzin Committed: Fri Sep 23 13:43:47 2016 -0700 -- .../org/apache/spark/InternalAccumulator.scala | 2 + .../org/apache/spark/executor/Executor.scala| 15 +++ .../org/apache/spark/executor/TaskMetrics.scala | 18 .../org/apache/spark/scheduler/ResultTask.scala | 8 ++ .../apache/spark/scheduler/ShuffleMapTask.scala | 8 ++ .../scala/org/apache/spark/scheduler/Task.scala | 2 + .../spark/status/api/v1/AllStagesResource.scala | 5 + .../org/apache/spark/status/api/v1/api.scala| 5 + .../spark/ui/jobs/JobProgressListener.scala | 4 + .../scala/org/apache/spark/ui/jobs/UIData.scala | 5 + .../org/apache/spark/util/JsonProtocol.scala| 10 ++ .../complete_stage_list_json_expectation.json | 3 + .../failed_stage_list_json_expectation.json | 1 + .../one_stage_attempt_json_expectation.json | 17 .../one_stage_json_expectation.json | 17 .../stage_list_json_expectation.json| 4 + ..._list_with_accumulable_json_expectation.json | 1 + .../stage_task_list_expectation.json| 40 ...m_multi_attempt_app_json_1__expectation.json | 16 +++ ...m_multi_attempt_app_json_2__expectation.json | 16 +++ ...ask_list_w__offset___length_expectation.json | 100 +++ .../stage_task_list_w__sortBy_expectation.json | 40 ...ortBy_short_names___runtime_expectation.json | 40 ...sortBy_short_names__runtime_expectation.json | 40 ...summary_w__custom_quantiles_expectation.json | 2 + ...task_summary_w_shuffle_read_expectation.json | 2 + ...ask_summary_w_shuffle_write_expectation.json | 2 + ...stage_with_accumulable_json_expectation.json | 17 .../apache/spark/util/JsonProtocolSuite.scala | 69 + project/MimaExcludes.scala | 4 + 30 files changed, 492 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/90a30f46/core/src/main/scala/org/apache/spark/InternalAccumulator.scala -- diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala index 0b494c1..82d3098 100644 --- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala +++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala @@ -31,7 +31,9 @@ private[spark] object InternalAccumulator { // Names of internal task level metrics val EXECUTOR_DESERIALIZE_TIME = METRICS_PREFIX + "executorDeserializeTime" + val EXECUTOR_DESERIALIZE_CPU_TIME = METRICS_PREFIX + "executorDeserializeCpuTime" val EXECUTOR_RUN_TIME = METRICS_PREFIX + "executorRunTime" + val EXECUTOR_CPU_TIME = METRICS_PREFIX + "executorCpuTime" val RESULT_SIZE = METRICS_PREFIX + "resultSize" val JVM_GC_TIME = METRICS_PREFIX + "jvmGCTime" val RESULT_SERIALIZATION_TIME = METRICS_PREFIX + "resultSerializationTime" http://git-wip-us.apache.org/repos/asf/spark/blob/90a30f46/core/src/main/scala/org/apache/spark/executor/Executor.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 668ec41..9501dd9 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -232,13 +232,18 @@ private[spark] class Executor( } override def run(): Unit = { + val threadMXBean = ManagementFactory.getThreadMXBean val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId) val deserializeStartTime = System.currentTimeMillis() + val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { +threadMXBean.getCurrentThreadCpuTime + } else 0L Thread.currentThread.setContextClassLoader(replClassLoader) val ser = env.closureSerializer.newInstance()
spark git commit: [SPARK-17577][CORE][2.0 BACKPORT] Update SparkContext.addFile to make it work well on Windows
Repository: spark Updated Branches: refs/heads/branch-2.0 1a8ea000e -> 452e468f2 [SPARK-17577][CORE][2.0 BACKPORT] Update SparkContext.addFile to make it work well on Windows ## What changes were proposed in this pull request? Update ```SparkContext.addFile``` to correct the use of ```URI``` and ```Path```, then it can work well on Windows. This is used for branch-2.0 backport, more details at #15131. ## How was this patch tested? Backport, checked by appveyor. Author: Yanbo LiangCloses #15217 from yanboliang/uri-2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/452e468f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/452e468f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/452e468f Branch: refs/heads/branch-2.0 Commit: 452e468f280d69c930782a7588a87a816cc9585a Parents: 1a8ea00 Author: Yanbo Liang Authored: Sat Sep 24 04:50:22 2016 +0900 Committer: Kousuke Saruta Committed: Sat Sep 24 04:50:22 2016 +0900 -- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/452e468f/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 214758f..251c16f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1419,7 +1419,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * supported for Hadoop-supported filesystems. */ def addFile(path: String, recursive: Boolean): Unit = { -val uri = new URI(path) +val uri = new Path(path).toUri val schemeCorrectedPath = uri.getScheme match { case null | "local" => new File(path).getCanonicalFile.toURI.toString case _ => path @@ -1453,8 +1453,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli logInfo(s"Added file $path at $key with timestamp $timestamp") // Fetch the file locally so that closures which are run on the driver can still use the // SparkFiles API to access files. - Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, -hadoopConfiguration, timestamp, useCache = false) + Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), conf, +env.securityManager, hadoopConfiguration, timestamp, useCache = false) postEnvironmentUpdate() } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17643] Remove comparable requirement from Offset
Repository: spark Updated Branches: refs/heads/master f62ddc598 -> 988c71457 [SPARK-17643] Remove comparable requirement from Offset For some sources, it is difficult to provide a global ordering based only on the data in the offset. Since we don't use comparison for correctness, lets remove it. Author: Michael ArmbrustCloses #15207 from marmbrus/removeComparable. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/988c7145 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/988c7145 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/988c7145 Branch: refs/heads/master Commit: 988c71457354b0a443471f501cef544a85b1a76a Parents: f62ddc5 Author: Michael Armbrust Authored: Fri Sep 23 12:17:59 2016 -0700 Committer: Tathagata Das Committed: Fri Sep 23 12:17:59 2016 -0700 -- .../execution/streaming/CompositeOffset.scala | 30 --- .../sql/execution/streaming/LongOffset.scala| 6 --- .../spark/sql/execution/streaming/Offset.scala | 19 ++ .../execution/streaming/StreamExecution.scala | 9 +++-- .../spark/sql/streaming/OffsetSuite.scala | 39 5 files changed, 9 insertions(+), 94 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/988c7145/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala index 729c846..ebc6ee8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala @@ -24,36 +24,6 @@ package org.apache.spark.sql.execution.streaming */ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset { /** - * Returns a negative integer, zero, or a positive integer as this object is less than, equal to, - * or greater than the specified object. - */ - override def compareTo(other: Offset): Int = other match { -case otherComposite: CompositeOffset if otherComposite.offsets.size == offsets.size => - val comparisons = offsets.zip(otherComposite.offsets).map { -case (Some(a), Some(b)) => a compareTo b -case (None, None) => 0 -case (None, _) => -1 -case (_, None) => 1 - } - val nonZeroSigns = comparisons.map(sign).filter(_ != 0).toSet - nonZeroSigns.size match { -case 0 => 0 // if both empty or only 0s -case 1 => nonZeroSigns.head // if there are only (0s and 1s) or (0s and -1s) -case _ => // there are both 1s and -1s - throw new IllegalArgumentException( -s"Invalid comparison between non-linear histories: $this <=> $other") - } -case _ => - throw new IllegalArgumentException(s"Cannot compare $this <=> $other") - } - - private def sign(num: Int): Int = num match { -case i if i < 0 => -1 -case i if i == 0 => 0 -case i if i > 0 => 1 - } - - /** * Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of * sources. * http://git-wip-us.apache.org/repos/asf/spark/blob/988c7145/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala index bb17640..c5e8827 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala @@ -22,12 +22,6 @@ package org.apache.spark.sql.execution.streaming */ case class LongOffset(offset: Long) extends Offset { - override def compareTo(other: Offset): Int = other match { -case l: LongOffset => offset.compareTo(l.offset) -case _ => - throw new IllegalArgumentException(s"Invalid comparison of $getClass with ${other.getClass}") - } - def +(increment: Long): LongOffset = new LongOffset(offset + increment) def -(decrement: Long): LongOffset = new LongOffset(offset - decrement) http://git-wip-us.apache.org/repos/asf/spark/blob/988c7145/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala -- diff --git
spark git commit: [SPARK-17210][SPARKR] sparkr.zip is not distributed to executors when running sparkr in RStudio
Repository: spark Updated Branches: refs/heads/branch-2.0 d3f90e71a -> 1a8ea000e [SPARK-17210][SPARKR] sparkr.zip is not distributed to executors when running sparkr in RStudio ## What changes were proposed in this pull request? Spark will add sparkr.zip to archive only when it is yarn mode (SparkSubmit.scala). ``` if (args.isR && clusterManager == YARN) { val sparkRPackagePath = RUtils.localSparkRPackagePath if (sparkRPackagePath.isEmpty) { printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.") } val sparkRPackageFile = new File(sparkRPackagePath.get, SPARKR_PACKAGE_ARCHIVE) if (!sparkRPackageFile.exists()) { printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.") } val sparkRPackageURI = Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString // Distribute the SparkR package. // Assigns a symbol link name "sparkr" to the shipped package. args.archives = mergeFileLists(args.archives, sparkRPackageURI + "#sparkr") // Distribute the R package archive containing all the built R packages. if (!RUtils.rPackages.isEmpty) { val rPackageFile = RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), R_PACKAGE_ARCHIVE) if (!rPackageFile.exists()) { printErrorAndExit("Failed to zip all the built R packages.") } val rPackageURI = Utils.resolveURI(rPackageFile.getAbsolutePath).toString // Assigns a symbol link name "rpkg" to the shipped package. args.archives = mergeFileLists(args.archives, rPackageURI + "#rpkg") } } ``` So it is necessary to pass spark.master from R process to JVM. Otherwise sparkr.zip won't be distributed to executor. Besides that I also pass spark.yarn.keytab/spark.yarn.principal to spark side, because JVM process need them to access secured cluster. ## How was this patch tested? Verify it manually in R Studio using the following code. ``` Sys.setenv(SPARK_HOME="/Users/jzhang/github/spark") .libPaths(c(file.path(Sys.getenv(), "R", "lib"), .libPaths())) library(SparkR) sparkR.session(master="yarn-client", sparkConfig = list(spark.executor.instances="1")) df <- as.DataFrame(mtcars) head(df) ``` ⦠Author: Jeff ZhangCloses #14784 from zjffdu/SPARK-17210. (cherry picked from commit f62ddc5983a08d4d54c0a9a8210dd6cbec555671) Signed-off-by: Felix Cheung Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a8ea000 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a8ea000 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a8ea000 Branch: refs/heads/branch-2.0 Commit: 1a8ea000e7e16bdee54c47ab0f5e197c15f200a6 Parents: d3f90e7 Author: Jeff Zhang Authored: Fri Sep 23 11:37:43 2016 -0700 Committer: Felix Cheung Committed: Fri Sep 23 11:38:21 2016 -0700 -- R/pkg/R/sparkR.R | 4 docs/sparkr.md | 15 +++ 2 files changed, 19 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1a8ea000/R/pkg/R/sparkR.R -- diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index 0601536..cc6d591 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -491,6 +491,10 @@ sparkConfToSubmitOps[["spark.driver.memory"]] <- "--driver-memory" sparkConfToSubmitOps[["spark.driver.extraClassPath"]] <- "--driver-class-path" sparkConfToSubmitOps[["spark.driver.extraJavaOptions"]] <- "--driver-java-options" sparkConfToSubmitOps[["spark.driver.extraLibraryPath"]] <- "--driver-library-path" +sparkConfToSubmitOps[["spark.master"]] <- "--master" +sparkConfToSubmitOps[["spark.yarn.keytab"]] <- "--keytab" +sparkConfToSubmitOps[["spark.yarn.principal"]] <- "--principal" + # Utility function that returns Spark Submit arguments as a string # http://git-wip-us.apache.org/repos/asf/spark/blob/1a8ea000/docs/sparkr.md -- diff --git a/docs/sparkr.md b/docs/sparkr.md index b881119..340e7f7 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -63,6 +63,21 @@ The following Spark driver properties can be set in `sparkConfig` with `sparkR.s Property NameProperty groupspark-submit equivalent +spark.master +Application Properties +--master + + +spark.yarn.keytab +Application Properties +--keytab + + +spark.yarn.principal +Application Properties +--principal + + spark.driver.memory Application Properties --driver-memory - To unsubscribe, e-mail:
spark git commit: [SPARK-17210][SPARKR] sparkr.zip is not distributed to executors when running sparkr in RStudio
Repository: spark Updated Branches: refs/heads/master f89808b0f -> f62ddc598 [SPARK-17210][SPARKR] sparkr.zip is not distributed to executors when running sparkr in RStudio ## What changes were proposed in this pull request? Spark will add sparkr.zip to archive only when it is yarn mode (SparkSubmit.scala). ``` if (args.isR && clusterManager == YARN) { val sparkRPackagePath = RUtils.localSparkRPackagePath if (sparkRPackagePath.isEmpty) { printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.") } val sparkRPackageFile = new File(sparkRPackagePath.get, SPARKR_PACKAGE_ARCHIVE) if (!sparkRPackageFile.exists()) { printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.") } val sparkRPackageURI = Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString // Distribute the SparkR package. // Assigns a symbol link name "sparkr" to the shipped package. args.archives = mergeFileLists(args.archives, sparkRPackageURI + "#sparkr") // Distribute the R package archive containing all the built R packages. if (!RUtils.rPackages.isEmpty) { val rPackageFile = RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), R_PACKAGE_ARCHIVE) if (!rPackageFile.exists()) { printErrorAndExit("Failed to zip all the built R packages.") } val rPackageURI = Utils.resolveURI(rPackageFile.getAbsolutePath).toString // Assigns a symbol link name "rpkg" to the shipped package. args.archives = mergeFileLists(args.archives, rPackageURI + "#rpkg") } } ``` So it is necessary to pass spark.master from R process to JVM. Otherwise sparkr.zip won't be distributed to executor. Besides that I also pass spark.yarn.keytab/spark.yarn.principal to spark side, because JVM process need them to access secured cluster. ## How was this patch tested? Verify it manually in R Studio using the following code. ``` Sys.setenv(SPARK_HOME="/Users/jzhang/github/spark") .libPaths(c(file.path(Sys.getenv(), "R", "lib"), .libPaths())) library(SparkR) sparkR.session(master="yarn-client", sparkConfig = list(spark.executor.instances="1")) df <- as.DataFrame(mtcars) head(df) ``` ⦠Author: Jeff ZhangCloses #14784 from zjffdu/SPARK-17210. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f62ddc59 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f62ddc59 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f62ddc59 Branch: refs/heads/master Commit: f62ddc5983a08d4d54c0a9a8210dd6cbec555671 Parents: f89808b Author: Jeff Zhang Authored: Fri Sep 23 11:37:43 2016 -0700 Committer: Felix Cheung Committed: Fri Sep 23 11:37:43 2016 -0700 -- R/pkg/R/sparkR.R | 4 docs/sparkr.md | 15 +++ 2 files changed, 19 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f62ddc59/R/pkg/R/sparkR.R -- diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index 0601536..cc6d591 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -491,6 +491,10 @@ sparkConfToSubmitOps[["spark.driver.memory"]] <- "--driver-memory" sparkConfToSubmitOps[["spark.driver.extraClassPath"]] <- "--driver-class-path" sparkConfToSubmitOps[["spark.driver.extraJavaOptions"]] <- "--driver-java-options" sparkConfToSubmitOps[["spark.driver.extraLibraryPath"]] <- "--driver-library-path" +sparkConfToSubmitOps[["spark.master"]] <- "--master" +sparkConfToSubmitOps[["spark.yarn.keytab"]] <- "--keytab" +sparkConfToSubmitOps[["spark.yarn.principal"]] <- "--principal" + # Utility function that returns Spark Submit arguments as a string # http://git-wip-us.apache.org/repos/asf/spark/blob/f62ddc59/docs/sparkr.md -- diff --git a/docs/sparkr.md b/docs/sparkr.md index b881119..340e7f7 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -63,6 +63,21 @@ The following Spark driver properties can be set in `sparkConfig` with `sparkR.s Property NameProperty groupspark-submit equivalent +spark.master +Application Properties +--master + + +spark.yarn.keytab +Application Properties +--keytab + + +spark.yarn.principal +Application Properties +--principal + + spark.driver.memory Application Properties --driver-memory - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17499][SPARKR][ML][MLLIB] make the default params in sparkR spark.mlp consistent with MultilayerPerceptronClassifier
Repository: spark Updated Branches: refs/heads/master 90d575421 -> f89808b0f [SPARK-17499][SPARKR][ML][MLLIB] make the default params in sparkR spark.mlp consistent with MultilayerPerceptronClassifier ## What changes were proposed in this pull request? update `MultilayerPerceptronClassifierWrapper.fit` paramter type: `layers: Array[Int]` `seed: String` update several default params in sparkR `spark.mlp`: `tol` --> 1e-6 `stepSize` --> 0.03 `seed` --> NULL ( when seed == NULL, the scala-side wrapper regard it as a `null` value and the seed will use the default one ) r-side `seed` only support 32bit integer. remove `layers` default value, and move it in front of those parameters with default value. add `layers` parameter validation check. ## How was this patch tested? tests added. Author: WeichenXuCloses #15051 from WeichenXu123/update_py_mlp_default. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f89808b0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f89808b0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f89808b0 Branch: refs/heads/master Commit: f89808b0fdbc04e1bdff1489a6ec4c84ddb2adc4 Parents: 90d5754 Author: WeichenXu Authored: Fri Sep 23 11:14:22 2016 -0700 Committer: Felix Cheung Committed: Fri Sep 23 11:14:22 2016 -0700 -- R/pkg/R/mllib.R | 13 ++--- R/pkg/inst/tests/testthat/test_mllib.R | 19 +++ .../MultilayerPerceptronClassifierWrapper.scala | 8 3 files changed, 33 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f89808b0/R/pkg/R/mllib.R -- diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 98db367..971c166 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -694,12 +694,19 @@ setMethod("predict", signature(object = "KMeansModel"), #' } #' @note spark.mlp since 2.1.0 setMethod("spark.mlp", signature(data = "SparkDataFrame"), - function(data, blockSize = 128, layers = c(3, 5, 2), solver = "l-bfgs", maxIter = 100, - tol = 0.5, stepSize = 1, seed = 1) { + function(data, layers, blockSize = 128, solver = "l-bfgs", maxIter = 100, + tol = 1E-6, stepSize = 0.03, seed = NULL) { +layers <- as.integer(na.omit(layers)) +if (length(layers) <= 1) { + stop ("layers must be a integer vector with length > 1.") +} +if (!is.null(seed)) { + seed <- as.character(as.integer(seed)) +} jobj <- callJStatic("org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper", "fit", data@sdf, as.integer(blockSize), as.array(layers), as.character(solver), as.integer(maxIter), as.numeric(tol), -as.numeric(stepSize), as.integer(seed)) +as.numeric(stepSize), seed) new("MultilayerPerceptronClassificationModel", jobj = jobj) }) http://git-wip-us.apache.org/repos/asf/spark/blob/f89808b0/R/pkg/inst/tests/testthat/test_mllib.R -- diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 24c40a8..a1eaaf2 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -391,6 +391,25 @@ test_that("spark.mlp", { unlink(modelPath) + # Test default parameter + model <- spark.mlp(df, layers = c(4, 5, 4, 3)) + mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) + expect_equal(head(mlpPredictions$prediction, 10), c(1, 1, 1, 1, 0, 1, 2, 2, 1, 0)) + + # Test illegal parameter + expect_error(spark.mlp(df, layers = NULL), "layers must be a integer vector with length > 1.") + expect_error(spark.mlp(df, layers = c()), "layers must be a integer vector with length > 1.") + expect_error(spark.mlp(df, layers = c(3)), "layers must be a integer vector with length > 1.") + + # Test random seed + # default seed + model <- spark.mlp(df, layers = c(4, 5, 4, 3), maxIter = 10) + mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) + expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 0, 1, 2, 2, 1, 2, 0, 1)) + # seed equals 10 + model <- spark.mlp(df, layers = c(4, 5, 4, 3), maxIter = 10, seed = 10) + mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) + expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 2, 1, 2, 2, 1, 0, 0, 1)) }) test_that("spark.naiveBayes", {
spark git commit: [SPARK-16861][PYSPARK][CORE] Refactor PySpark accumulator API on top of Accumulator V2
Repository: spark Updated Branches: refs/heads/master 5c5396cb4 -> 90d575421 [SPARK-16861][PYSPARK][CORE] Refactor PySpark accumulator API on top of Accumulator V2 ## What changes were proposed in this pull request? Move the internals of the PySpark accumulator API from the old deprecated API on top of the new accumulator API. ## How was this patch tested? The existing PySpark accumulator tests (both unit tests and doc tests at the start of accumulator.py). Author: Holden KarauCloses #14467 from holdenk/SPARK-16861-refactor-pyspark-accumulator-api. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90d57542 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90d57542 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90d57542 Branch: refs/heads/master Commit: 90d5754212425d55f992c939a2bc7d9ac6ef92b8 Parents: 5c5396c Author: Holden Karau Authored: Fri Sep 23 09:44:30 2016 +0100 Committer: Sean Owen Committed: Fri Sep 23 09:44:30 2016 +0100 -- .../org/apache/spark/api/python/PythonRDD.scala | 42 +++- python/pyspark/context.py | 5 +-- 2 files changed, 25 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/90d57542/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index d841091..0ca91b9 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -20,7 +20,7 @@ package org.apache.spark.api.python import java.io._ import java.net._ import java.nio.charset.StandardCharsets -import java.util.{ArrayList => JArrayList, Collections, List => JList, Map => JMap} +import java.util.{ArrayList => JArrayList, List => JList, Map => JMap} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -38,7 +38,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.input.PortableDataStream import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.util.{SerializableConfiguration, Utils} +import org.apache.spark.util._ private[spark] class PythonRDD( @@ -75,7 +75,7 @@ private[spark] case class PythonFunction( pythonExec: String, pythonVer: String, broadcastVars: JList[Broadcast[PythonBroadcast]], -accumulator: Accumulator[JList[Array[Byte]]]) +accumulator: PythonAccumulatorV2) /** * A wrapper for chained Python functions (from bottom to top). @@ -200,7 +200,7 @@ private[spark] class PythonRunner( val updateLen = stream.readInt() val update = new Array[Byte](updateLen) stream.readFully(update) -accumulator += Collections.singletonList(update) +accumulator.add(update) } // Check whether the worker is ready to be re-used. if (stream.readInt() == SpecialLengths.END_OF_STREAM) { @@ -461,7 +461,7 @@ private[spark] object PythonRDD extends Logging { JavaRDD[Array[Byte]] = { val file = new DataInputStream(new FileInputStream(filename)) try { - val objs = new collection.mutable.ArrayBuffer[Array[Byte]] + val objs = new mutable.ArrayBuffer[Array[Byte]] try { while (true) { val length = file.readInt() @@ -866,11 +866,13 @@ class BytesToString extends org.apache.spark.api.java.function.Function[Array[By } /** - * Internal class that acts as an `AccumulatorParam` for Python accumulators. Inside, it + * Internal class that acts as an `AccumulatorV2` for Python accumulators. Inside, it * collects a list of pickled strings that we pass to Python through a socket. */ -private class PythonAccumulatorParam(@transient private val serverHost: String, serverPort: Int) - extends AccumulatorParam[JList[Array[Byte]]] { +private[spark] class PythonAccumulatorV2( +@transient private val serverHost: String, +private val serverPort: Int) + extends CollectionAccumulator[Array[Byte]] { Utils.checkHost(serverHost, "Expected hostname") @@ -880,30 +882,33 @@ private class PythonAccumulatorParam(@transient private val serverHost: String, * We try to reuse a single Socket to transfer accumulator updates, as they are all added * by the DAGScheduler's single-threaded RpcEndpoint anyway. */ - @transient var socket: Socket = _ + @transient private var socket: Socket = _ - def openSocket(): Socket = synchronized { + private def openSocket(): Socket =
spark git commit: [BUILD] Closes some stale PRs
Repository: spark Updated Branches: refs/heads/master 62ccf27ab -> 5c5396cb4 [BUILD] Closes some stale PRs ## What changes were proposed in this pull request? This PR proposes to close some stale PRs and ones suggested to be closed by committer(s) Closes #12415 Closes #14765 Closes #15118 Closes #15184 Closes #15183 Closes #9440 Closes #15023 Closes #14643 Closes #14827 ## How was this patch tested? N/A Author: hyukjinkwonCloses #15198 from HyukjinKwon/stale-prs. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c5396cb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c5396cb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c5396cb Branch: refs/heads/master Commit: 5c5396cb4725ba5ceee26ed885e8b941d219757b Parents: 62ccf27 Author: hyukjinkwon Authored: Fri Sep 23 09:41:50 2016 +0100 Committer: Sean Owen Committed: Fri Sep 23 09:41:50 2016 +0100 -- -- - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17640][SQL] Avoid using -1 as the default batchId for FileStreamSource.FileEntry
Repository: spark Updated Branches: refs/heads/branch-2.0 54d4eee51 -> d3f90e71a [SPARK-17640][SQL] Avoid using -1 as the default batchId for FileStreamSource.FileEntry ## What changes were proposed in this pull request? Avoid using -1 as the default batchId for FileStreamSource.FileEntry so that we can make sure not writing any FileEntry(..., batchId = -1) into the log. This also avoids people misusing it in future (#15203 is an example). ## How was this patch tested? Jenkins. Author: Shixiong ZhuCloses #15206 from zsxwing/cleanup. (cherry picked from commit 62ccf27ab4b55e734646678ae78b7e812262d14b) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3f90e71 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3f90e71 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3f90e71 Branch: refs/heads/branch-2.0 Commit: d3f90e71af57162afc0648adbc52b810a883ceac Parents: 54d4eee Author: Shixiong Zhu Authored: Thu Sep 22 23:35:08 2016 -0700 Committer: Shixiong Zhu Committed: Thu Sep 22 23:35:15 2016 -0700 -- .../execution/streaming/FileStreamSource.scala | 37 ++-- .../streaming/FileStreamSourceSuite.scala | 24 ++--- 2 files changed, 31 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d3f90e71/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 4515f9a..8c3e718 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -59,7 +59,7 @@ class FileStreamSource( val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs) metadataLog.allFiles().foreach { entry => -seenFiles.add(entry) +seenFiles.add(entry.path, entry.timestamp) } seenFiles.purge() @@ -73,14 +73,16 @@ class FileStreamSource( */ private def fetchMaxOffset(): LongOffset = synchronized { // All the new files found - ignore aged files and files that we have seen. -val newFiles = fetchAllFiles().filter(seenFiles.isNewFile) +val newFiles = fetchAllFiles().filter { + case (path, timestamp) => seenFiles.isNewFile(path, timestamp) +} // Obey user's setting to limit the number of files in this batch trigger. val batchFiles = if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles batchFiles.foreach { file => - seenFiles.add(file) + seenFiles.add(file._1, file._2) logDebug(s"New file: $file") } val numPurged = seenFiles.purge() @@ -95,7 +97,9 @@ class FileStreamSource( if (batchFiles.nonEmpty) { maxBatchId += 1 - metadataLog.add(maxBatchId, batchFiles.map(_.copy(batchId = maxBatchId)).toArray) + metadataLog.add(maxBatchId, batchFiles.map { case (path, timestamp) => +FileEntry(path = path, timestamp = timestamp, batchId = maxBatchId) + }.toArray) logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files") } @@ -140,12 +144,12 @@ class FileStreamSource( /** * Returns a list of files found, sorted by their timestamp. */ - private def fetchAllFiles(): Seq[FileEntry] = { + private def fetchAllFiles(): Seq[(String, Long)] = { val startTime = System.nanoTime val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath) val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType)) val files = catalog.allFiles().sortBy(_.getModificationTime).map { status => - FileEntry(status.getPath.toUri.toString, status.getModificationTime) + (status.getPath.toUri.toString, status.getModificationTime) } val endTime = System.nanoTime val listingTimeMs = (endTime.toDouble - startTime) / 100 @@ -172,10 +176,7 @@ object FileStreamSource { /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */ type Timestamp = Long - val NOT_SET = -1L - - case class FileEntry(path: String, timestamp: Timestamp, batchId: Long = NOT_SET) -extends Serializable + case class FileEntry(path: String, timestamp: Timestamp, batchId: Long) extends Serializable /** * A custom hash map used to track the list of files seen. This map is not thread-safe. @@ -196,10 +197,10 @@ object
spark git commit: [SPARK-17640][SQL] Avoid using -1 as the default batchId for FileStreamSource.FileEntry
Repository: spark Updated Branches: refs/heads/master 947b8c6e3 -> 62ccf27ab [SPARK-17640][SQL] Avoid using -1 as the default batchId for FileStreamSource.FileEntry ## What changes were proposed in this pull request? Avoid using -1 as the default batchId for FileStreamSource.FileEntry so that we can make sure not writing any FileEntry(..., batchId = -1) into the log. This also avoids people misusing it in future (#15203 is an example). ## How was this patch tested? Jenkins. Author: Shixiong ZhuCloses #15206 from zsxwing/cleanup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62ccf27a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62ccf27a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62ccf27a Branch: refs/heads/master Commit: 62ccf27ab4b55e734646678ae78b7e812262d14b Parents: 947b8c6 Author: Shixiong Zhu Authored: Thu Sep 22 23:35:08 2016 -0700 Committer: Shixiong Zhu Committed: Thu Sep 22 23:35:08 2016 -0700 -- .../execution/streaming/FileStreamSource.scala | 37 ++-- .../streaming/FileStreamSourceSuite.scala | 24 ++--- 2 files changed, 31 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/62ccf27a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 5ebc083..be02327 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -59,7 +59,7 @@ class FileStreamSource( val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs) metadataLog.allFiles().foreach { entry => -seenFiles.add(entry) +seenFiles.add(entry.path, entry.timestamp) } seenFiles.purge() @@ -73,14 +73,16 @@ class FileStreamSource( */ private def fetchMaxOffset(): LongOffset = synchronized { // All the new files found - ignore aged files and files that we have seen. -val newFiles = fetchAllFiles().filter(seenFiles.isNewFile) +val newFiles = fetchAllFiles().filter { + case (path, timestamp) => seenFiles.isNewFile(path, timestamp) +} // Obey user's setting to limit the number of files in this batch trigger. val batchFiles = if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles batchFiles.foreach { file => - seenFiles.add(file) + seenFiles.add(file._1, file._2) logDebug(s"New file: $file") } val numPurged = seenFiles.purge() @@ -95,7 +97,9 @@ class FileStreamSource( if (batchFiles.nonEmpty) { maxBatchId += 1 - metadataLog.add(maxBatchId, batchFiles.map(_.copy(batchId = maxBatchId)).toArray) + metadataLog.add(maxBatchId, batchFiles.map { case (path, timestamp) => +FileEntry(path = path, timestamp = timestamp, batchId = maxBatchId) + }.toArray) logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files") } @@ -140,12 +144,12 @@ class FileStreamSource( /** * Returns a list of files found, sorted by their timestamp. */ - private def fetchAllFiles(): Seq[FileEntry] = { + private def fetchAllFiles(): Seq[(String, Long)] = { val startTime = System.nanoTime val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath) val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType)) val files = catalog.allFiles().sortBy(_.getModificationTime).map { status => - FileEntry(status.getPath.toUri.toString, status.getModificationTime) + (status.getPath.toUri.toString, status.getModificationTime) } val endTime = System.nanoTime val listingTimeMs = (endTime.toDouble - startTime) / 100 @@ -172,10 +176,7 @@ object FileStreamSource { /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */ type Timestamp = Long - val NOT_SET = -1L - - case class FileEntry(path: String, timestamp: Timestamp, batchId: Long = NOT_SET) -extends Serializable + case class FileEntry(path: String, timestamp: Timestamp, batchId: Long) extends Serializable /** * A custom hash map used to track the list of files seen. This map is not thread-safe. @@ -196,10 +197,10 @@ object FileStreamSource { private var lastPurgeTimestamp: Timestamp = 0L /** Add a new file to the map. */ -def add(file: