spark git commit: [SPARK-3722][Docs]minor improvement and fix in docs
Repository: spark Updated Branches: refs/heads/master 825709a0b - e421072da [SPARK-3722][Docs]minor improvement and fix in docs https://issues.apache.org/jira/browse/SPARK-3722 Author: WangTao barneystin...@aliyun.com Closes #2579 from WangTaoTheTonic/docsWork and squashes the following commits: 6f91cec [WangTao] use more wording express 29d22fa [WangTao] delete the specified version link 34cb4ea [WangTao] Update running-on-yarn.md 4ee1a26 [WangTao] minor improvement and fix in docs Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e421072d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e421072d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e421072d Branch: refs/heads/master Commit: e421072da0ea87e7056cc3f2130ddaafc731530f Parents: 825709a Author: WangTao barneystin...@aliyun.com Authored: Fri Nov 14 08:09:42 2014 -0600 Committer: Thomas Graves tgra...@apache.org Committed: Fri Nov 14 08:09:42 2014 -0600 -- docs/configuration.md | 2 +- docs/running-on-yarn.md | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e421072d/docs/configuration.md -- diff --git a/docs/configuration.md b/docs/configuration.md index f0b396e..8839162 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -52,7 +52,7 @@ Then, you can supply configuration values at runtime: --conf spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps myApp.jar {% endhighlight %} -The Spark shell and [`spark-submit`](cluster-overview.html#launching-applications-with-spark-submit) +The Spark shell and [`spark-submit`](submitting-applications.html) tool support two ways to load configurations dynamically. The first are command line options, such as `--master`, as shown above. `spark-submit` can accept any Spark property using the `--conf` flag, but uses special flags for properties that play a part in launching the Spark application. http://git-wip-us.apache.org/repos/asf/spark/blob/e421072d/docs/running-on-yarn.md -- diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 2f7e498..dfe2db4 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -39,7 +39,7 @@ Most of the configs are the same for Spark on YARN as for other deployment modes tdcodespark.yarn.preserve.staging.files/code/td tdfalse/td td -Set to true to preserve the staged files (Spark jar, app jar, distributed cache files) at the end of the job rather then delete them. +Set to true to preserve the staged files (Spark jar, app jar, distributed cache files) at the end of the job rather than delete them. /td /tr tr @@ -159,7 +159,7 @@ For example: lib/spark-examples*.jar \ 10 -The above starts a YARN client program which starts the default Application Master. Then SparkPi will be run as a child thread of Application Master. The client will periodically poll the Application Master for status updates and display them in the console. The client will exit once your application has finished running. Refer to the Viewing Logs section below for how to see driver and executor logs. +The above starts a YARN client program which starts the default Application Master. Then SparkPi will be run as a child thread of Application Master. The client will periodically poll the Application Master for status updates and display them in the console. The client will exit once your application has finished running. Refer to the Debugging your Application section below for how to see driver and executor logs. To launch a Spark application in yarn-client mode, do the same, but replace yarn-cluster with yarn-client. To run spark-shell: @@ -181,7 +181,7 @@ In YARN terminology, executors and application masters run inside containers. yarn logs -applicationId app ID -will print out the contents of all log files from all containers from the given application. +will print out the contents of all log files from all containers from the given application. You can also view the container log files directly in HDFS using the HDFS shell or API. The directory where they are located can be found by looking at your YARN configs (`yarn.nodemanager.remote-app-log-dir` and `yarn.nodemanager.remote-app-log-dir-suffix`). When log aggregation isn't turned on, logs are retained locally on each machine under `YARN_APP_LOGS_DIR`, which is usually configured to `/tmp/logs` or `$HADOOP_HOME/logs/userlogs` depending on the Hadoop version and installation. Viewing logs for a container requires going to the host that contains them and looking in this
spark git commit: Revert [SPARK-2703][Core]Make Tachyon related unit tests execute without deploying a Tachyon system locally.
Repository: spark Updated Branches: refs/heads/branch-1.2 39257ca1b - 3219271f4 Revert [SPARK-2703][Core]Make Tachyon related unit tests execute without deploying a Tachyon system locally. This reverts commit c127ff8c87fc4f3aa6f09697928832dc6d37cc0f. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3219271f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3219271f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3219271f Branch: refs/heads/branch-1.2 Commit: 3219271f403091d4d3af4cddd08121ba538a459b Parents: 39257ca Author: Patrick Wendell pwend...@gmail.com Authored: Fri Nov 14 12:34:21 2014 -0800 Committer: Patrick Wendell pwend...@gmail.com Committed: Fri Nov 14 12:34:21 2014 -0800 -- core/pom.xml | 7 --- .../org/apache/spark/storage/BlockManagerSuite.scala | 11 ++- project/SparkBuild.scala | 2 -- 3 files changed, 2 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3219271f/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 03eb231..492eddd 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -226,13 +226,6 @@ /dependency dependency groupIdorg.tachyonproject/groupId - artifactIdtachyon/artifactId - version0.5.0/version - typetest-jar/type - scopetest/scope -/dependency -dependency - groupIdorg.tachyonproject/groupId artifactIdtachyon-client/artifactId version0.5.0/version exclusions http://git-wip-us.apache.org/repos/asf/spark/blob/3219271f/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index c5e6ccc..5554efb 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -36,7 +36,6 @@ import org.mockito.Mockito.{mock, when} import org.scalatest.{BeforeAndAfter, FunSuite, Matchers, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts._ -import tachyon.master.LocalTachyonCluster import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext, SecurityManager} import org.apache.spark.executor.DataReadMethod @@ -537,14 +536,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test(tachyon storage) { -val tachyonUnitTestEnabled = conf.getBoolean(spark.test.tachyon.enable, true) +// TODO Make the spark.test.tachyon.enable true after using tachyon 0.5.0 testing jar. +val tachyonUnitTestEnabled = conf.getBoolean(spark.test.tachyon.enable, false) if (tachyonUnitTestEnabled) { - val tachyonCluster = new LocalTachyonCluster(3000) - tachyonCluster.start() - val tachyonURL = tachyon.Constants.HEADER + -tachyonCluster.getMasterHostname() + : + tachyonCluster.getMasterPort() - conf.set(spark.tachyonStore.url, tachyonURL) - conf.set(spark.tachyonStore.folderName, app-test) store = makeBlockManager(1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) @@ -555,7 +549,6 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(store.getSingle(a3).isDefined, a3 was in store) assert(store.getSingle(a2).isDefined, a2 was in store) assert(store.getSingle(a1).isDefined, a1 was in store) - tachyonCluster.stop() } else { info(tachyon storage test disabled.) } http://git-wip-us.apache.org/repos/asf/spark/blob/3219271f/project/SparkBuild.scala -- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index bbba642..d95d50a 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -389,8 +389,6 @@ object TestSettings { testOptions += Tests.Argument(TestFrameworks.JUnit, -v, -a), // Enable Junit testing. libraryDependencies += com.novocode % junit-interface % 0.9 % test, -// Enable Tachyon local testing. -libraryDependencies += org.tachyonproject % tachyon % 0.5.0 % test classifier tests, // Only allow one test at a time, even across projects, since they run in the same JVM parallelExecution in Test := false, concurrentRestrictions in Global += Tags.limit(Tags.Test, 1), - To unsubscribe, e-mail:
spark git commit: [SPARK-4398][PySpark] specialize sc.parallelize(xrange)
Repository: spark Updated Branches: refs/heads/master 77e845ca7 - abd581752 [SPARK-4398][PySpark] specialize sc.parallelize(xrange) `sc.parallelize(range(1 20), 1).count()` may take 15 seconds to finish and the rdd object stores the entire list, making task size very large. This PR adds a specialized version for xrange. JoshRosen davies Author: Xiangrui Meng m...@databricks.com Closes #3264 from mengxr/SPARK-4398 and squashes the following commits: 8953c41 [Xiangrui Meng] follow davies' suggestion cbd58e3 [Xiangrui Meng] specialize sc.parallelize(xrange) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abd58175 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abd58175 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abd58175 Branch: refs/heads/master Commit: abd581752f9314791a688690c07ad1bb68cc09fe Parents: 77e845c Author: Xiangrui Meng m...@databricks.com Authored: Fri Nov 14 12:43:17 2014 -0800 Committer: Xiangrui Meng m...@databricks.com Committed: Fri Nov 14 12:43:17 2014 -0800 -- python/pyspark/context.py | 25 + 1 file changed, 21 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/abd58175/python/pyspark/context.py -- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index faa5952..b6c9914 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -289,12 +289,29 @@ class SparkContext(object): def parallelize(self, c, numSlices=None): -Distribute a local Python collection to form an RDD. +Distribute a local Python collection to form an RDD. Using xrange +is recommended if the input represents a range for performance. - sc.parallelize(range(5), 5).glom().collect() -[[0], [1], [2], [3], [4]] + sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect() +[[0], [2], [3], [4], [6]] + sc.parallelize(xrange(0, 6, 2), 5).glom().collect() +[[], [0], [], [2], [4]] -numSlices = numSlices or self.defaultParallelism +numSlices = int(numSlices) if numSlices is not None else self.defaultParallelism +if isinstance(c, xrange): +size = len(c) +if size == 0: +return self.parallelize([], numSlices) +step = c[1] - c[0] if size 1 else 1 +start0 = c[0] + +def getStart(split): +return start0 + (split * size / numSlices) * step + +def f(split, iterator): +return xrange(getStart(split), getStart(split + 1), step) + +return self.parallelize([], numSlices).mapPartitionsWithIndex(f) # Calling the Java parallelize() method with an ArrayList is too slow, # because it sends O(n) Py4J commands. As an alternative, serialized # objects are written to a file and loaded through textFile(). - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4398][PySpark] specialize sc.parallelize(xrange)
Repository: spark Updated Branches: refs/heads/branch-1.2 3219271f4 - 3014803ea [SPARK-4398][PySpark] specialize sc.parallelize(xrange) `sc.parallelize(range(1 20), 1).count()` may take 15 seconds to finish and the rdd object stores the entire list, making task size very large. This PR adds a specialized version for xrange. JoshRosen davies Author: Xiangrui Meng m...@databricks.com Closes #3264 from mengxr/SPARK-4398 and squashes the following commits: 8953c41 [Xiangrui Meng] follow davies' suggestion cbd58e3 [Xiangrui Meng] specialize sc.parallelize(xrange) (cherry picked from commit abd581752f9314791a688690c07ad1bb68cc09fe) Signed-off-by: Xiangrui Meng m...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3014803e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3014803e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3014803e Branch: refs/heads/branch-1.2 Commit: 3014803ead0aac31f36f4387c919174877525ff4 Parents: 3219271 Author: Xiangrui Meng m...@databricks.com Authored: Fri Nov 14 12:43:17 2014 -0800 Committer: Xiangrui Meng m...@databricks.com Committed: Fri Nov 14 12:43:25 2014 -0800 -- python/pyspark/context.py | 25 + 1 file changed, 21 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3014803e/python/pyspark/context.py -- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index faa5952..b6c9914 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -289,12 +289,29 @@ class SparkContext(object): def parallelize(self, c, numSlices=None): -Distribute a local Python collection to form an RDD. +Distribute a local Python collection to form an RDD. Using xrange +is recommended if the input represents a range for performance. - sc.parallelize(range(5), 5).glom().collect() -[[0], [1], [2], [3], [4]] + sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect() +[[0], [2], [3], [4], [6]] + sc.parallelize(xrange(0, 6, 2), 5).glom().collect() +[[], [0], [], [2], [4]] -numSlices = numSlices or self.defaultParallelism +numSlices = int(numSlices) if numSlices is not None else self.defaultParallelism +if isinstance(c, xrange): +size = len(c) +if size == 0: +return self.parallelize([], numSlices) +step = c[1] - c[0] if size 1 else 1 +start0 = c[0] + +def getStart(split): +return start0 + (split * size / numSlices) * step + +def f(split, iterator): +return xrange(getStart(split), getStart(split + 1), step) + +return self.parallelize([], numSlices).mapPartitionsWithIndex(f) # Calling the Java parallelize() method with an ArrayList is too slow, # because it sends O(n) Py4J commands. As an alternative, serialized # objects are written to a file and loaded through textFile(). - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [Spark Core] SPARK-4380 Edit spilling log from MB to B
Repository: spark Updated Branches: refs/heads/master abd581752 - 0c56a039a [Spark Core] SPARK-4380 Edit spilling log from MB to B https://issues.apache.org/jira/browse/SPARK-4380 Author: Hong Shen hongs...@tencent.com Closes #3243 from shenh062326/spark_change and squashes the following commits: 4653378 [Hong Shen] Edit spilling log from MB to B 21ee960 [Hong Shen] Edit spilling log from MB to B e9145e8 [Hong Shen] Edit spilling log from MB to B da761c2 [Hong Shen] Edit spilling log from MB to B 946351c [Hong Shen] Edit spilling log from MB to B Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c56a039 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c56a039 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c56a039 Branch: refs/heads/master Commit: 0c56a039a9c5b871422f0fc55ff4394bc077fb34 Parents: abd5817 Author: Hong Shen hongs...@tencent.com Authored: Fri Nov 14 13:29:41 2014 -0800 Committer: Andrew Or and...@databricks.com Committed: Fri Nov 14 13:29:41 2014 -0800 -- .../main/scala/org/apache/spark/util/collection/Spillable.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0c56a039/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala index d7dccd4..0e4c6d6 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -105,7 +105,8 @@ private[spark] trait Spillable[C] { */ @inline private def logSpillage(size: Long) { val threadId = Thread.currentThread().getId -logInfo(Thread %d spilling in-memory map of %d MB to disk (%d time%s so far) -.format(threadId, size / (1024 * 1024), _spillCount, if (_spillCount 1) s else )) +logInfo(Thread %d spilling in-memory map of %s to disk (%d time%s so far) +.format(threadId, org.apache.spark.util.Utils.bytesToString(size), +_spillCount, if (_spillCount 1) s else )) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: SPARK-3663 Document SPARK_LOG_DIR and SPARK_PID_DIR
Repository: spark Updated Branches: refs/heads/master 0c56a039a - 5c265ccde SPARK-3663 Document SPARK_LOG_DIR and SPARK_PID_DIR These descriptions are from the header of spark-daemon.sh Author: Andrew Ash and...@andrewash.com Closes #2518 from ash211/SPARK-3663 and squashes the following commits: 058b257 [Andrew Ash] Complete hanging clause in SPARK_PID_DIR description a17cb4b [Andrew Ash] Update docs for default locations per SPARK-4110 af89096 [Andrew Ash] SPARK-3663 Document SPARK_LOG_DIR and SPARK_PID_DIR Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c265ccd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c265ccd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c265ccd Branch: refs/heads/master Commit: 5c265ccde0c5594899ec61f9c1ea100ddff52da7 Parents: 0c56a03 Author: Andrew Ash and...@andrewash.com Authored: Fri Nov 14 13:33:35 2014 -0800 Committer: Andrew Or and...@databricks.com Committed: Fri Nov 14 13:33:35 2014 -0800 -- conf/spark-env.sh.template | 9 - 1 file changed, 8 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5c265ccd/conf/spark-env.sh.template -- diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template index f8ffbf6..0886b02 100755 --- a/conf/spark-env.sh.template +++ b/conf/spark-env.sh.template @@ -28,7 +28,7 @@ # - SPARK_YARN_DIST_FILES, Comma separated list of files to be distributed with the job. # - SPARK_YARN_DIST_ARCHIVES, Comma separated list of archives to be distributed with the job. -# Options for the daemons used in the standalone deploy mode: +# Options for the daemons used in the standalone deploy mode # - SPARK_MASTER_IP, to bind the master to a different IP address or hostname # - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master # - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. -Dx=y) @@ -41,3 +41,10 @@ # - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. -Dx=y) # - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. -Dx=y) # - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers + +# Generic options for the daemons used in the standalone deploy mode +# - SPARK_CONF_DIR Alternate conf dir. (Default: ${SPARK_HOME}/conf) +# - SPARK_LOG_DIR Where log files are stored. (Default: ${SPARK_HOME}/logs) +# - SPARK_PID_DIR Where the pid file is stored. (Default: /tmp) +# - SPARK_IDENT_STRING A string representing this instance of spark. (Default: $USER) +# - SPARK_NICENESS The scheduling priority for daemons. (Default: 0) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: SPARK-3663 Document SPARK_LOG_DIR and SPARK_PID_DIR
Repository: spark Updated Branches: refs/heads/branch-1.2 d579b3989 - 204eaf165 SPARK-3663 Document SPARK_LOG_DIR and SPARK_PID_DIR These descriptions are from the header of spark-daemon.sh Author: Andrew Ash and...@andrewash.com Closes #2518 from ash211/SPARK-3663 and squashes the following commits: 058b257 [Andrew Ash] Complete hanging clause in SPARK_PID_DIR description a17cb4b [Andrew Ash] Update docs for default locations per SPARK-4110 af89096 [Andrew Ash] SPARK-3663 Document SPARK_LOG_DIR and SPARK_PID_DIR (cherry picked from commit 5c265ccde0c5594899ec61f9c1ea100ddff52da7) Signed-off-by: Andrew Or and...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/204eaf16 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/204eaf16 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/204eaf16 Branch: refs/heads/branch-1.2 Commit: 204eaf1653b2bdd0befe364392baa32c31ce0d3e Parents: d579b39 Author: Andrew Ash and...@andrewash.com Authored: Fri Nov 14 13:33:35 2014 -0800 Committer: Andrew Or and...@databricks.com Committed: Fri Nov 14 13:33:43 2014 -0800 -- conf/spark-env.sh.template | 9 - 1 file changed, 8 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/204eaf16/conf/spark-env.sh.template -- diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template index f8ffbf6..0886b02 100755 --- a/conf/spark-env.sh.template +++ b/conf/spark-env.sh.template @@ -28,7 +28,7 @@ # - SPARK_YARN_DIST_FILES, Comma separated list of files to be distributed with the job. # - SPARK_YARN_DIST_ARCHIVES, Comma separated list of archives to be distributed with the job. -# Options for the daemons used in the standalone deploy mode: +# Options for the daemons used in the standalone deploy mode # - SPARK_MASTER_IP, to bind the master to a different IP address or hostname # - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master # - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. -Dx=y) @@ -41,3 +41,10 @@ # - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. -Dx=y) # - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. -Dx=y) # - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers + +# Generic options for the daemons used in the standalone deploy mode +# - SPARK_CONF_DIR Alternate conf dir. (Default: ${SPARK_HOME}/conf) +# - SPARK_LOG_DIR Where log files are stored. (Default: ${SPARK_HOME}/logs) +# - SPARK_PID_DIR Where the pid file is stored. (Default: /tmp) +# - SPARK_IDENT_STRING A string representing this instance of spark. (Default: $USER) +# - SPARK_NICENESS The scheduling priority for daemons. (Default: 0) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4313][WebUI][Yarn] Fix link issue of the executor thread dump page in yarn-cluster mode
Repository: spark Updated Branches: refs/heads/master 5c265ccde - 156cf [SPARK-4313][WebUI][Yarn] Fix link issue of the executor thread dump page in yarn-cluster mode In yarn-cluster mode, the Web UI is running behind a yarn proxy server. Some features(or bugs?) of yarn proxy server will break the links for thread dump. 1. Yarn proxy server will do http redirect internally, so if opening `http://example.com:8088/cluster/app/application_1415344371838_0012/executors`, it will fetch `http://example.com:8088/cluster/app/application_1415344371838_0012/executors/` and return the content but won't change the link in the browser. Then when a user clicks `Thread Dump`, it will jump to `http://example.com:8088/proxy/application_1415344371838_0012/threadDump/?executorId=2`. This is a wrong link. The correct link should be `http://example.com:8088/proxy/application_1415344371838_0012/executors/threadDump/?executorId=2`. Adding / to the tab links will fix it. 2. Yarn proxy server has a bug about the URL encode/decode. When a user accesses `http://example.com:8088/proxy/application_1415344371838_0006/executors/threadDump/?executorId=%3Cdriver%3E`, the yarn proxy server will require `http://example.com:36429/executors/threadDump/?executorId=%25253Cdriver%25253E`. But Spark web server expects `http://example.com:36429/executors/threadDump/?executorId=%3Cdriver%3E`. Related to [YARN-2844](https://issues.apache.org/jira/browse/YARN-2844). For now, it's a tricky approach to bypass the yarn bug. ![threaddump](https://cloud.githubusercontent.com/assets/1000778/4972567/d1ccba64-68ad-11e4-983e-257530cef35a.png) Author: zsxwing zsxw...@gmail.com Closes #3183 from zsxwing/SPARK-4313 and squashes the following commits: 3379ca8 [zsxwing] Encode the executor id in the thread dump link and update the comment abfa063 [zsxwing] Fix link issue of the executor thread dump page in yarn-cluster mode Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/156cf333 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/156cf333 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/156cf333 Branch: refs/heads/master Commit: 156cfdcd93304eb5240f5a6466a3a0311957 Parents: 5c265cc Author: zsxwing zsxw...@gmail.com Authored: Fri Nov 14 13:36:13 2014 -0800 Committer: Andrew Or and...@databricks.com Committed: Fri Nov 14 13:36:13 2014 -0800 -- .../src/main/scala/org/apache/spark/ui/UIUtils.scala | 2 +- .../spark/ui/exec/ExecutorThreadDumpPage.scala | 15 ++- .../org/apache/spark/ui/exec/ExecutorsPage.scala | 4 +++- 3 files changed, 18 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/156cf333/core/src/main/scala/org/apache/spark/ui/UIUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 3312671..7bc1e24 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -175,7 +175,7 @@ private[spark] object UIUtils extends Logging { val shortAppName = if (appName.length 36) appName else appName.take(32) + ... val header = activeTab.headerTabs.map { tab = li class={if (tab == activeTab) active else } -a href={prependBaseUri(activeTab.basePath, / + tab.prefix)}{tab.name}/a +a href={prependBaseUri(activeTab.basePath, / + tab.prefix + /)}{tab.name}/a /li } http://git-wip-us.apache.org/repos/asf/spark/blob/156cf333/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala index e9c755e..c82730f 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala @@ -17,6 +17,7 @@ package org.apache.spark.ui.exec +import java.net.URLDecoder import javax.servlet.http.HttpServletRequest import scala.util.Try @@ -29,7 +30,19 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage private val sc = parent.sc def render(request: HttpServletRequest): Seq[Node] = { -val executorId = Option(request.getParameter(executorId)).getOrElse { +val executorId = Option(request.getParameter(executorId)).map { + executorId = +// Due to YARN-2844, driver in the url will be encoded to %25253Cdriver%25253E when +// running in yarn-cluster mode. `request.getParameter(executorId)` will
spark git commit: [SPARK-4313][WebUI][Yarn] Fix link issue of the executor thread dump page in yarn-cluster mode
Repository: spark Updated Branches: refs/heads/branch-1.2 204eaf165 - 88278241e [SPARK-4313][WebUI][Yarn] Fix link issue of the executor thread dump page in yarn-cluster mode In yarn-cluster mode, the Web UI is running behind a yarn proxy server. Some features(or bugs?) of yarn proxy server will break the links for thread dump. 1. Yarn proxy server will do http redirect internally, so if opening `http://example.com:8088/cluster/app/application_1415344371838_0012/executors`, it will fetch `http://example.com:8088/cluster/app/application_1415344371838_0012/executors/` and return the content but won't change the link in the browser. Then when a user clicks `Thread Dump`, it will jump to `http://example.com:8088/proxy/application_1415344371838_0012/threadDump/?executorId=2`. This is a wrong link. The correct link should be `http://example.com:8088/proxy/application_1415344371838_0012/executors/threadDump/?executorId=2`. Adding / to the tab links will fix it. 2. Yarn proxy server has a bug about the URL encode/decode. When a user accesses `http://example.com:8088/proxy/application_1415344371838_0006/executors/threadDump/?executorId=%3Cdriver%3E`, the yarn proxy server will require `http://example.com:36429/executors/threadDump/?executorId=%25253Cdriver%25253E`. But Spark web server expects `http://example.com:36429/executors/threadDump/?executorId=%3Cdriver%3E`. Related to [YARN-2844](https://issues.apache.org/jira/browse/YARN-2844). For now, it's a tricky approach to bypass the yarn bug. ![threaddump](https://cloud.githubusercontent.com/assets/1000778/4972567/d1ccba64-68ad-11e4-983e-257530cef35a.png) Author: zsxwing zsxw...@gmail.com Closes #3183 from zsxwing/SPARK-4313 and squashes the following commits: 3379ca8 [zsxwing] Encode the executor id in the thread dump link and update the comment abfa063 [zsxwing] Fix link issue of the executor thread dump page in yarn-cluster mode (cherry picked from commit 156cfdcd93304eb5240f5a6466a3a0311957) Signed-off-by: Andrew Or and...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88278241 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88278241 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88278241 Branch: refs/heads/branch-1.2 Commit: 88278241e9d9ca17db2f7c20d4434c32b7deff92 Parents: 204eaf1 Author: zsxwing zsxw...@gmail.com Authored: Fri Nov 14 13:36:13 2014 -0800 Committer: Andrew Or and...@databricks.com Committed: Fri Nov 14 13:36:20 2014 -0800 -- .../src/main/scala/org/apache/spark/ui/UIUtils.scala | 2 +- .../spark/ui/exec/ExecutorThreadDumpPage.scala | 15 ++- .../org/apache/spark/ui/exec/ExecutorsPage.scala | 4 +++- 3 files changed, 18 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/88278241/core/src/main/scala/org/apache/spark/ui/UIUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 3312671..7bc1e24 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -175,7 +175,7 @@ private[spark] object UIUtils extends Logging { val shortAppName = if (appName.length 36) appName else appName.take(32) + ... val header = activeTab.headerTabs.map { tab = li class={if (tab == activeTab) active else } -a href={prependBaseUri(activeTab.basePath, / + tab.prefix)}{tab.name}/a +a href={prependBaseUri(activeTab.basePath, / + tab.prefix + /)}{tab.name}/a /li } http://git-wip-us.apache.org/repos/asf/spark/blob/88278241/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala index e9c755e..c82730f 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala @@ -17,6 +17,7 @@ package org.apache.spark.ui.exec +import java.net.URLDecoder import javax.servlet.http.HttpServletRequest import scala.util.Try @@ -29,7 +30,19 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage private val sc = parent.sc def render(request: HttpServletRequest): Seq[Node] = { -val executorId = Option(request.getParameter(executorId)).getOrElse { +val executorId = Option(request.getParameter(executorId)).map { + executorId = +// Due to YARN-2844, driver in the url will be
spark git commit: Update failed assert text to match code in SizeEstimatorSuite
Repository: spark Updated Branches: refs/heads/master 156cf - c258db9ed Update failed assert text to match code in SizeEstimatorSuite Author: Jeff Hammerbacher jeff.hammerbac...@gmail.com Closes #3242 from hammer/patch-1 and squashes the following commits: f88d635 [Jeff Hammerbacher] Update failed assert text to match code in SizeEstimatorSuite Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c258db9e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c258db9e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c258db9e Branch: refs/heads/master Commit: c258db9ed4104b6eefe9f55f3e3959a3c46c2900 Parents: 156cf33 Author: Jeff Hammerbacher jeff.hammerbac...@gmail.com Authored: Fri Nov 14 13:37:48 2014 -0800 Committer: Andrew Or and...@databricks.com Committed: Fri Nov 14 13:37:48 2014 -0800 -- core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c258db9e/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala index f9d1af8..0ea2d13 100644 --- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala @@ -118,7 +118,7 @@ class SizeEstimatorSuite // TODO: If we sample 100 elements, this should always be 4176 ? val estimatedSize = SizeEstimator.estimate(Array.fill(1000)(d1)) assert(estimatedSize = 4000, Estimated size + estimatedSize + should be more than 4000) -assert(estimatedSize = 4200, Estimated size + estimatedSize + should be less than 4100) +assert(estimatedSize = 4200, Estimated size + estimatedSize + should be less than 4200) } test(32-bit arch) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Update failed assert text to match code in SizeEstimatorSuite
Repository: spark Updated Branches: refs/heads/branch-1.2 88278241e - e7f957437 Update failed assert text to match code in SizeEstimatorSuite Author: Jeff Hammerbacher jeff.hammerbac...@gmail.com Closes #3242 from hammer/patch-1 and squashes the following commits: f88d635 [Jeff Hammerbacher] Update failed assert text to match code in SizeEstimatorSuite (cherry picked from commit c258db9ed4104b6eefe9f55f3e3959a3c46c2900) Signed-off-by: Andrew Or and...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e7f95743 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e7f95743 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e7f95743 Branch: refs/heads/branch-1.2 Commit: e7f957437ad013d16992a7ab12da58fa8eb6a880 Parents: 8827824 Author: Jeff Hammerbacher jeff.hammerbac...@gmail.com Authored: Fri Nov 14 13:37:48 2014 -0800 Committer: Andrew Or and...@databricks.com Committed: Fri Nov 14 13:37:56 2014 -0800 -- core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e7f95743/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala index f9d1af8..0ea2d13 100644 --- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala @@ -118,7 +118,7 @@ class SizeEstimatorSuite // TODO: If we sample 100 elements, this should always be 4176 ? val estimatedSize = SizeEstimator.estimate(Array.fill(1000)(d1)) assert(estimatedSize = 4000, Estimated size + estimatedSize + should be more than 4000) -assert(estimatedSize = 4200, Estimated size + estimatedSize + should be less than 4100) +assert(estimatedSize = 4200, Estimated size + estimatedSize + should be less than 4200) } test(32-bit arch) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: [SPARK-4239] [SQL] support view in HiveQl
Repository: spark Updated Branches: refs/heads/master c258db9ed - ade72c436 http://git-wip-us.apache.org/repos/asf/spark/blob/ade72c43/sql/hive/src/test/resources/golden/view_cast-8-2cc0c576f0a008abf5bdf3308d500869 -- diff --git a/sql/hive/src/test/resources/golden/view_cast-8-2cc0c576f0a008abf5bdf3308d500869 b/sql/hive/src/test/resources/golden/view_cast-8-2cc0c576f0a008abf5bdf3308d500869 new file mode 100644 index 000..e69de29 http://git-wip-us.apache.org/repos/asf/spark/blob/ade72c43/sql/hive/src/test/resources/golden/view_cast-9-f306bf3ad1c2a99f6f1843db44d7dfb4 -- diff --git a/sql/hive/src/test/resources/golden/view_cast-9-f306bf3ad1c2a99f6f1843db44d7dfb4 b/sql/hive/src/test/resources/golden/view_cast-9-f306bf3ad1c2a99f6f1843db44d7dfb4 new file mode 100644 index 000..e69de29 http://git-wip-us.apache.org/repos/asf/spark/blob/ade72c43/sql/hive/src/test/resources/golden/view_inputs-0-9e67dfd1d595ab8b1935b789645f76c0 -- diff --git a/sql/hive/src/test/resources/golden/view_inputs-0-9e67dfd1d595ab8b1935b789645f76c0 b/sql/hive/src/test/resources/golden/view_inputs-0-9e67dfd1d595ab8b1935b789645f76c0 new file mode 100644 index 000..e69de29 http://git-wip-us.apache.org/repos/asf/spark/blob/ade72c43/sql/hive/src/test/resources/golden/view_inputs-1-5af97e73bc3841793440105aae766bbe -- diff --git a/sql/hive/src/test/resources/golden/view_inputs-1-5af97e73bc3841793440105aae766bbe b/sql/hive/src/test/resources/golden/view_inputs-1-5af97e73bc3841793440105aae766bbe new file mode 100644 index 000..e69de29 http://git-wip-us.apache.org/repos/asf/spark/blob/ade72c43/sql/hive/src/test/resources/golden/view_inputs-2-626fa3664754125edc44b7ca7f8630db -- diff --git a/sql/hive/src/test/resources/golden/view_inputs-2-626fa3664754125edc44b7ca7f8630db b/sql/hive/src/test/resources/golden/view_inputs-2-626fa3664754125edc44b7ca7f8630db new file mode 100644 index 000..1f3d8a7 --- /dev/null +++ b/sql/hive/src/test/resources/golden/view_inputs-2-626fa3664754125edc44b7ca7f8630db @@ -0,0 +1 @@ +1028 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4245][SQL] Fix containsNull of the result ArrayType of CreateArray expression.
Repository: spark Updated Branches: refs/heads/branch-1.2 51b053a31 - 4bdeeb7d2 [SPARK-4245][SQL] Fix containsNull of the result ArrayType of CreateArray expression. The `containsNull` of the result `ArrayType` of `CreateArray` should be `true` only if the children is empty or there exists nullable child. Author: Takuya UESHIN ues...@happy-camper.st Closes #3110 from ueshin/issues/SPARK-4245 and squashes the following commits: 6f64746 [Takuya UESHIN] Move equalsIgnoreNullability method into DataType. 5a90e02 [Takuya UESHIN] Refine InsertIntoHiveType and add some comments. cbecba8 [Takuya UESHIN] Fix a test title. 884ec37 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-4245 3c5274b [Takuya UESHIN] Add tests to insert data of types ArrayType / MapType / StructType with nullability is false into Hive table. 41a94a9 [Takuya UESHIN] Replace InsertIntoTable with InsertIntoHiveTable if data types ignoring nullability are same. 43e6ef5 [Takuya UESHIN] Fix containsNull for empty array. 778e997 [Takuya UESHIN] Fix containsNull of the result ArrayType of CreateArray expression. (cherry picked from commit bbd8f5bee81d5788c356977c173dd1edc42c77a3) Signed-off-by: Michael Armbrust mich...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4bdeeb7d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4bdeeb7d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4bdeeb7d Branch: refs/heads/branch-1.2 Commit: 4bdeeb7d25453b9b50c7dc23a5c7f588754f0e52 Parents: 51b053a Author: Takuya UESHIN ues...@happy-camper.st Authored: Fri Nov 14 14:21:16 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Fri Nov 14 14:21:32 2014 -0800 -- .../sql/catalyst/expressions/complexTypes.scala | 4 +- .../spark/sql/catalyst/types/dataTypes.scala| 21 .../spark/sql/hive/HiveMetastoreCatalog.scala | 27 +++ .../apache/spark/sql/hive/HiveStrategies.scala | 6 ++- .../sql/hive/InsertIntoHiveTableSuite.scala | 50 5 files changed, 106 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4bdeeb7d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala index 19421e5..917b346 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala @@ -115,7 +115,9 @@ case class CreateArray(children: Seq[Expression]) extends Expression { override def dataType: DataType = { assert(resolved, sInvalid dataType of mixed ArrayType ${childTypes.mkString(,)}) -ArrayType(childTypes.headOption.getOrElse(NullType)) +ArrayType( + childTypes.headOption.getOrElse(NullType), + containsNull = children.exists(_.nullable)) } override def nullable: Boolean = false http://git-wip-us.apache.org/repos/asf/spark/blob/4bdeeb7d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index 5dd19dd..ff1dc03 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -171,6 +171,27 @@ object DataType { case _ = } } + + /** + * Compares two types, ignoring nullability of ArrayType, MapType, StructType. + */ + def equalsIgnoreNullability(left: DataType, right: DataType): Boolean = { +(left, right) match { + case (ArrayType(leftElementType, _), ArrayType(rightElementType, _)) = +equalsIgnoreNullability(leftElementType, rightElementType) + case (MapType(leftKeyType, leftValueType, _), MapType(rightKeyType, rightValueType, _)) = +equalsIgnoreNullability(leftKeyType, rightKeyType) +equalsIgnoreNullability(leftValueType, rightValueType) + case (StructType(leftFields), StructType(rightFields)) = +leftFields.size == rightFields.size +leftFields.zip(rightFields) + .forall{ +case (left, right) = + left.name == right.name equalsIgnoreNullability(left.dataType, right.dataType) + } + case (left, right) = left == right +} + } } abstract class DataType {
spark git commit: SPARK-4375. no longer require -Pscala-2.10
Repository: spark Updated Branches: refs/heads/master bbd8f5bee - f5f757e4e SPARK-4375. no longer require -Pscala-2.10 It seems like the winds might have moved away from this approach, but wanted to post the PR anyway because I got it working and to show what it would look like. Author: Sandy Ryza sa...@cloudera.com Closes #3239 from sryza/sandy-spark-4375 and squashes the following commits: 0ffbe95 [Sandy Ryza] Enable -Dscala-2.11 in sbt cd42d94 [Sandy Ryza] Update doc f6644c3 [Sandy Ryza] SPARK-4375 take 2 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5f757e4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5f757e4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5f757e4 Branch: refs/heads/master Commit: f5f757e4ed80759dc5668c63d5663651689f8da8 Parents: bbd8f5b Author: Sandy Ryza sa...@cloudera.com Authored: Fri Nov 14 14:21:57 2014 -0800 Committer: Patrick Wendell pwend...@gmail.com Committed: Fri Nov 14 14:21:57 2014 -0800 -- docs/building-spark.md | 4 +- examples/pom.xml | 65 ++- pom.xml | 22 +--- project/SparkBuild.scala | 3 ++ repl/pom.xml | 117 +++--- sql/catalyst/pom.xml | 11 +--- sql/hive/pom.xml | 3 -- 7 files changed, 54 insertions(+), 171 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f5f757e4/docs/building-spark.md -- diff --git a/docs/building-spark.md b/docs/building-spark.md index 20ba7da..bb18414 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -113,9 +113,9 @@ mvn -Pyarn -Phive -Phive-thriftserver-0.12.0 -Phadoop-2.4 -Dhadoop.version=2.4.0 {% endhighlight %} # Building for Scala 2.11 -To produce a Spark package compiled with Scala 2.11, use the `-Pscala-2.11` profile: +To produce a Spark package compiled with Scala 2.11, use the `-Dscala-2.11` property: -mvn -Pyarn -Phadoop-2.4 -Pscala-2.11 -DskipTests clean package +mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package Scala 2.11 support in Spark is experimental and does not support a few features. Specifically, Spark's external Kafka library and JDBC component are not yet http://git-wip-us.apache.org/repos/asf/spark/blob/f5f757e4/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 2ec5728..2752ce3 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -389,11 +389,11 @@ /properties /profile profile - !-- We add source directories specific to Scala 2.10 and 2.11 since some examples - work only in one and not the other -- + !-- We add a source directory specific to Scala 2.10 since Kafka and Algebird + only work with it -- idscala-2.10/id activation -activeByDefaulttrue/activeByDefault +propertyname!scala-2.11/name/property /activation dependencies dependency @@ -427,65 +427,6 @@ /sources /configuration /execution - execution -idadd-scala-test-sources/id -phasegenerate-test-sources/phase -goals - goaladd-test-source/goal -/goals -configuration - sources -sourcesrc/test/scala/source -sourcescala-2.10/src/test/scala/source -sourcescala-2.10/src/test/java/source - /sources -/configuration - /execution -/executions - /plugin -/plugins - /build -/profile -profile - idscala-2.11/id - activation -activeByDefaultfalse/activeByDefault - /activation - dependencies -!-- Streaming Kafka and zeromq modules are disabled for now. -- - /dependencies - build -plugins - plugin -groupIdorg.codehaus.mojo/groupId -artifactIdbuild-helper-maven-plugin/artifactId -executions - execution -idadd-scala-sources/id -phasegenerate-sources/phase -goals - goaladd-source/goal -/goals -configuration - sources -sourcesrc/main/scala/source -sourcescala-2.11/src/main/scala/source - /sources -/configuration - /execution - execution -idadd-scala-test-sources/id -phasegenerate-test-sources/phase -goals
spark git commit: SPARK-4375. no longer require -Pscala-2.10
Repository: spark Updated Branches: refs/heads/branch-1.2 4bdeeb7d2 - d90ddf12b SPARK-4375. no longer require -Pscala-2.10 It seems like the winds might have moved away from this approach, but wanted to post the PR anyway because I got it working and to show what it would look like. Author: Sandy Ryza sa...@cloudera.com Closes #3239 from sryza/sandy-spark-4375 and squashes the following commits: 0ffbe95 [Sandy Ryza] Enable -Dscala-2.11 in sbt cd42d94 [Sandy Ryza] Update doc f6644c3 [Sandy Ryza] SPARK-4375 take 2 (cherry picked from commit f5f757e4ed80759dc5668c63d5663651689f8da8) Signed-off-by: Patrick Wendell pwend...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d90ddf12 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d90ddf12 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d90ddf12 Branch: refs/heads/branch-1.2 Commit: d90ddf12b6bea2162e982e800c96d2c94f66b347 Parents: 4bdeeb7 Author: Sandy Ryza sa...@cloudera.com Authored: Fri Nov 14 14:21:57 2014 -0800 Committer: Patrick Wendell pwend...@gmail.com Committed: Fri Nov 14 14:22:13 2014 -0800 -- docs/building-spark.md | 4 +- examples/pom.xml | 65 ++- pom.xml | 22 +--- project/SparkBuild.scala | 3 ++ repl/pom.xml | 117 +++--- sql/catalyst/pom.xml | 11 +--- sql/hive/pom.xml | 3 -- 7 files changed, 54 insertions(+), 171 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d90ddf12/docs/building-spark.md -- diff --git a/docs/building-spark.md b/docs/building-spark.md index 20ba7da..bb18414 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -113,9 +113,9 @@ mvn -Pyarn -Phive -Phive-thriftserver-0.12.0 -Phadoop-2.4 -Dhadoop.version=2.4.0 {% endhighlight %} # Building for Scala 2.11 -To produce a Spark package compiled with Scala 2.11, use the `-Pscala-2.11` profile: +To produce a Spark package compiled with Scala 2.11, use the `-Dscala-2.11` property: -mvn -Pyarn -Phadoop-2.4 -Pscala-2.11 -DskipTests clean package +mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package Scala 2.11 support in Spark is experimental and does not support a few features. Specifically, Spark's external Kafka library and JDBC component are not yet http://git-wip-us.apache.org/repos/asf/spark/blob/d90ddf12/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 2ec5728..2752ce3 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -389,11 +389,11 @@ /properties /profile profile - !-- We add source directories specific to Scala 2.10 and 2.11 since some examples - work only in one and not the other -- + !-- We add a source directory specific to Scala 2.10 since Kafka and Algebird + only work with it -- idscala-2.10/id activation -activeByDefaulttrue/activeByDefault +propertyname!scala-2.11/name/property /activation dependencies dependency @@ -427,65 +427,6 @@ /sources /configuration /execution - execution -idadd-scala-test-sources/id -phasegenerate-test-sources/phase -goals - goaladd-test-source/goal -/goals -configuration - sources -sourcesrc/test/scala/source -sourcescala-2.10/src/test/scala/source -sourcescala-2.10/src/test/java/source - /sources -/configuration - /execution -/executions - /plugin -/plugins - /build -/profile -profile - idscala-2.11/id - activation -activeByDefaultfalse/activeByDefault - /activation - dependencies -!-- Streaming Kafka and zeromq modules are disabled for now. -- - /dependencies - build -plugins - plugin -groupIdorg.codehaus.mojo/groupId -artifactIdbuild-helper-maven-plugin/artifactId -executions - execution -idadd-scala-sources/id -phasegenerate-sources/phase -goals - goaladd-source/goal -/goals -configuration - sources -sourcesrc/main/scala/source -sourcescala-2.11/src/main/scala/source - /sources -/configuration - /execution -
spark git commit: [SPARK-4333][SQL] Correctly log number of iterations in RuleExecutor
Repository: spark Updated Branches: refs/heads/master f5f757e4e - 0cbdb01e1 [SPARK-4333][SQL] Correctly log number of iterations in RuleExecutor When iterator of RuleExecutor breaks, the num of iterator should be (iteration - 1) not (iteration ).Because log looks like Fixed point reached for batch ${batch.name} after 3 iterations., but it did 2 iterations really! Author: DoingDone9 799203...@qq.com Closes #3180 from DoingDone9/issue_01 and squashes the following commits: 571e2ed [DoingDone9] Update RuleExecutor.scala 46514b6 [DoingDone9] When iterator of RuleExecutor breaks, the num of iterator should be iteration - 1 not iteration. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0cbdb01e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0cbdb01e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0cbdb01e Branch: refs/heads/master Commit: 0cbdb01e1c817e71c4f80de05c4e5bb11510b368 Parents: f5f757e Author: DoingDone9 799203...@qq.com Authored: Fri Nov 14 14:28:06 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Fri Nov 14 14:28:06 2014 -0800 -- .../scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0cbdb01e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index d192b15..c441f0b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -79,7 +79,8 @@ abstract class RuleExecutor[TreeType : TreeNode[_]] extends Logging { } if (curPlan.fastEquals(lastPlan)) { - logTrace(sFixed point reached for batch ${batch.name} after $iteration iterations.) + logTrace( +sFixed point reached for batch ${batch.name} after ${iteration - 1} iterations.) continue = false } lastPlan = curPlan - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4333][SQL] Correctly log number of iterations in RuleExecutor
Repository: spark Updated Branches: refs/heads/branch-1.2 d90ddf12b - f8810b6a5 [SPARK-4333][SQL] Correctly log number of iterations in RuleExecutor When iterator of RuleExecutor breaks, the num of iterator should be (iteration - 1) not (iteration ).Because log looks like Fixed point reached for batch ${batch.name} after 3 iterations., but it did 2 iterations really! Author: DoingDone9 799203...@qq.com Closes #3180 from DoingDone9/issue_01 and squashes the following commits: 571e2ed [DoingDone9] Update RuleExecutor.scala 46514b6 [DoingDone9] When iterator of RuleExecutor breaks, the num of iterator should be iteration - 1 not iteration. (cherry picked from commit 0cbdb01e1c817e71c4f80de05c4e5bb11510b368) Signed-off-by: Michael Armbrust mich...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f8810b6a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f8810b6a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f8810b6a Branch: refs/heads/branch-1.2 Commit: f8810b6a572f314ab0b88899172d8fa2b78e014f Parents: d90ddf1 Author: DoingDone9 799203...@qq.com Authored: Fri Nov 14 14:28:06 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Fri Nov 14 14:28:19 2014 -0800 -- .../scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f8810b6a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index d192b15..c441f0b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -79,7 +79,8 @@ abstract class RuleExecutor[TreeType : TreeNode[_]] extends Logging { } if (curPlan.fastEquals(lastPlan)) { - logTrace(sFixed point reached for batch ${batch.name} after $iteration iterations.) + logTrace( +sFixed point reached for batch ${batch.name} after ${iteration - 1} iterations.) continue = false } lastPlan = curPlan - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4062][Streaming]Add ReliableKafkaReceiver in Spark Streaming Kafka connector
Repository: spark Updated Branches: refs/heads/branch-1.2 f8810b6a5 - 5b63158ac [SPARK-4062][Streaming]Add ReliableKafkaReceiver in Spark Streaming Kafka connector Add ReliableKafkaReceiver in Kafka connector to prevent data loss if WAL in Spark Streaming is enabled. Details and design doc can be seen in [SPARK-4062](https://issues.apache.org/jira/browse/SPARK-4062). Author: jerryshao saisai.s...@intel.com Author: Tathagata Das tathagata.das1...@gmail.com Author: Saisai Shao saisai.s...@intel.com Closes #2991 from jerryshao/kafka-refactor and squashes the following commits: 5461f1c [Saisai Shao] Merge pull request #8 from tdas/kafka-refactor3 eae4ad6 [Tathagata Das] Refectored KafkaStreamSuiteBased to eliminate KafkaTestUtils and made Java more robust. fab14c7 [Tathagata Das] minor update. 149948b [Tathagata Das] Fixed mistake 14630aa [Tathagata Das] Minor updates. d9a452c [Tathagata Das] Minor updates. ec2e95e [Tathagata Das] Removed the receiver's locks and essentially reverted to Saisai's original design. 2a20a01 [jerryshao] Address some comments 9f636b3 [Saisai Shao] Merge pull request #5 from tdas/kafka-refactor b2b2f84 [Tathagata Das] Refactored Kafka receiver logic and Kafka testsuites e501b3c [jerryshao] Add Mima excludes b798535 [jerryshao] Fix the missed issue e5e21c1 [jerryshao] Change to while loop ea873e4 [jerryshao] Further address the comments 98f3d07 [jerryshao] Fix comment style 4854ee9 [jerryshao] Address all the comments 96c7a1d [jerryshao] Update the ReliableKafkaReceiver unit test 8135d31 [jerryshao] Fix flaky test a949741 [jerryshao] Address the comments 16bfe78 [jerryshao] Change the ordering of imports 0894aef [jerryshao] Add some comments 77c3e50 [jerryshao] Code refactor and add some unit tests dd9aeeb [jerryshao] Initial commit for reliable Kafka receiver (cherry picked from commit 5930f64bf0d2516304b21bd49eac361a54caabdd) Signed-off-by: Tathagata Das tathagata.das1...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b63158a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b63158a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b63158a Branch: refs/heads/branch-1.2 Commit: 5b63158ac2100627ae4a77f3a89ae038e5b6be90 Parents: f8810b6 Author: jerryshao saisai.s...@intel.com Authored: Fri Nov 14 14:33:37 2014 -0800 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Fri Nov 14 14:33:56 2014 -0800 -- .../streaming/kafka/KafkaInputDStream.scala | 33 ++- .../spark/streaming/kafka/KafkaUtils.scala | 4 +- .../streaming/kafka/ReliableKafkaReceiver.scala | 282 +++ .../streaming/kafka/JavaKafkaStreamSuite.java | 44 +-- .../streaming/kafka/KafkaStreamSuite.scala | 216 +++--- .../kafka/ReliableKafkaStreamSuite.scala| 140 + project/MimaExcludes.scala | 4 + .../streaming/receiver/BlockGenerator.scala | 55 +++- .../receiver/ReceiverSupervisorImpl.scala | 8 +- .../apache/spark/streaming/ReceiverSuite.scala | 8 +- 10 files changed, 651 insertions(+), 143 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5b63158a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala -- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index 28ac592..4d26b64 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -17,13 +17,12 @@ package org.apache.spark.streaming.kafka +import java.util.Properties + import scala.collection.Map import scala.reflect.{classTag, ClassTag} -import java.util.Properties -import java.util.concurrent.Executors - -import kafka.consumer._ +import kafka.consumer.{KafkaStream, Consumer, ConsumerConfig, ConsumerConnector} import kafka.serializer.Decoder import kafka.utils.VerifiableProperties @@ -32,6 +31,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.util.Utils /** * Input stream that pulls messages from a Kafka Broker. @@ -51,12 +51,16 @@ class KafkaInputDStream[ @transient ssc_ : StreamingContext, kafkaParams: Map[String, String], topics: Map[String, Int], +useReliableReceiver: Boolean, storageLevel: StorageLevel ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging { def getReceiver(): Receiver[(K, V)]
spark git commit: [SPARK-4062][Streaming]Add ReliableKafkaReceiver in Spark Streaming Kafka connector
Repository: spark Updated Branches: refs/heads/master 0cbdb01e1 - 5930f64bf [SPARK-4062][Streaming]Add ReliableKafkaReceiver in Spark Streaming Kafka connector Add ReliableKafkaReceiver in Kafka connector to prevent data loss if WAL in Spark Streaming is enabled. Details and design doc can be seen in [SPARK-4062](https://issues.apache.org/jira/browse/SPARK-4062). Author: jerryshao saisai.s...@intel.com Author: Tathagata Das tathagata.das1...@gmail.com Author: Saisai Shao saisai.s...@intel.com Closes #2991 from jerryshao/kafka-refactor and squashes the following commits: 5461f1c [Saisai Shao] Merge pull request #8 from tdas/kafka-refactor3 eae4ad6 [Tathagata Das] Refectored KafkaStreamSuiteBased to eliminate KafkaTestUtils and made Java more robust. fab14c7 [Tathagata Das] minor update. 149948b [Tathagata Das] Fixed mistake 14630aa [Tathagata Das] Minor updates. d9a452c [Tathagata Das] Minor updates. ec2e95e [Tathagata Das] Removed the receiver's locks and essentially reverted to Saisai's original design. 2a20a01 [jerryshao] Address some comments 9f636b3 [Saisai Shao] Merge pull request #5 from tdas/kafka-refactor b2b2f84 [Tathagata Das] Refactored Kafka receiver logic and Kafka testsuites e501b3c [jerryshao] Add Mima excludes b798535 [jerryshao] Fix the missed issue e5e21c1 [jerryshao] Change to while loop ea873e4 [jerryshao] Further address the comments 98f3d07 [jerryshao] Fix comment style 4854ee9 [jerryshao] Address all the comments 96c7a1d [jerryshao] Update the ReliableKafkaReceiver unit test 8135d31 [jerryshao] Fix flaky test a949741 [jerryshao] Address the comments 16bfe78 [jerryshao] Change the ordering of imports 0894aef [jerryshao] Add some comments 77c3e50 [jerryshao] Code refactor and add some unit tests dd9aeeb [jerryshao] Initial commit for reliable Kafka receiver Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5930f64b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5930f64b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5930f64b Branch: refs/heads/master Commit: 5930f64bf0d2516304b21bd49eac361a54caabdd Parents: 0cbdb01 Author: jerryshao saisai.s...@intel.com Authored: Fri Nov 14 14:33:37 2014 -0800 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Fri Nov 14 14:33:37 2014 -0800 -- .../streaming/kafka/KafkaInputDStream.scala | 33 ++- .../spark/streaming/kafka/KafkaUtils.scala | 4 +- .../streaming/kafka/ReliableKafkaReceiver.scala | 282 +++ .../streaming/kafka/JavaKafkaStreamSuite.java | 44 +-- .../streaming/kafka/KafkaStreamSuite.scala | 216 +++--- .../kafka/ReliableKafkaStreamSuite.scala| 140 + project/MimaExcludes.scala | 4 + .../streaming/receiver/BlockGenerator.scala | 55 +++- .../receiver/ReceiverSupervisorImpl.scala | 8 +- .../apache/spark/streaming/ReceiverSuite.scala | 8 +- 10 files changed, 651 insertions(+), 143 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5930f64b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala -- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index 28ac592..4d26b64 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -17,13 +17,12 @@ package org.apache.spark.streaming.kafka +import java.util.Properties + import scala.collection.Map import scala.reflect.{classTag, ClassTag} -import java.util.Properties -import java.util.concurrent.Executors - -import kafka.consumer._ +import kafka.consumer.{KafkaStream, Consumer, ConsumerConfig, ConsumerConnector} import kafka.serializer.Decoder import kafka.utils.VerifiableProperties @@ -32,6 +31,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.util.Utils /** * Input stream that pulls messages from a Kafka Broker. @@ -51,12 +51,16 @@ class KafkaInputDStream[ @transient ssc_ : StreamingContext, kafkaParams: Map[String, String], topics: Map[String, Int], +useReliableReceiver: Boolean, storageLevel: StorageLevel ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging { def getReceiver(): Receiver[(K, V)] = { -new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) -.asInstanceOf[Receiver[(K, V)]] +if
spark git commit: [SPARK-4390][SQL] Handle NaN cast to decimal correctly
Repository: spark Updated Branches: refs/heads/branch-1.2 5b63158ac - 0dd924178 [SPARK-4390][SQL] Handle NaN cast to decimal correctly Author: Michael Armbrust mich...@databricks.com Closes #3256 from marmbrus/NanDecimal and squashes the following commits: 4c3ba46 [Michael Armbrust] fix style d360f83 [Michael Armbrust] Handle NaN cast to decimal (cherry picked from commit a0300ea32a9d92bd51c72930bc3979087b0082b2) Signed-off-by: Michael Armbrust mich...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0dd92417 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0dd92417 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0dd92417 Branch: refs/heads/branch-1.2 Commit: 0dd9241783b01815b68059067c72f36b8d05dddf Parents: 5b63158 Author: Michael Armbrust mich...@databricks.com Authored: Fri Nov 14 14:56:57 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Fri Nov 14 14:57:09 2014 -0800 -- .../scala/org/apache/spark/sql/catalyst/expressions/Cast.scala | 6 +- .../golden/NaN to Decimal-0-6ca781bc343025635d72321ef0a9d425 | 1 + .../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 3 +++ 3 files changed, 9 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0dd92417/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 55319e7..34697a1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -290,7 +290,11 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case LongType = b = changePrecision(Decimal(b.asInstanceOf[Long]), target) case x: NumericType = // All other numeric types can be represented precisely as Doubles - b = changePrecision(Decimal(x.numeric.asInstanceOf[Numeric[Any]].toDouble(b)), target) + b = try { + changePrecision(Decimal(x.numeric.asInstanceOf[Numeric[Any]].toDouble(b)), target) + } catch { +case _: NumberFormatException = null + } } // DoubleConverter http://git-wip-us.apache.org/repos/asf/spark/blob/0dd92417/sql/hive/src/test/resources/golden/NaN to Decimal-0-6ca781bc343025635d72321ef0a9d425 -- diff --git a/sql/hive/src/test/resources/golden/NaN to Decimal-0-6ca781bc343025635d72321ef0a9d425 b/sql/hive/src/test/resources/golden/NaN to Decimal-0-6ca781bc343025635d72321ef0a9d425 new file mode 100644 index 000..7951def --- /dev/null +++ b/sql/hive/src/test/resources/golden/NaN to Decimal-0-6ca781bc343025635d72321ef0a9d425 @@ -0,0 +1 @@ +NULL http://git-wip-us.apache.org/repos/asf/spark/blob/0dd92417/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 684d228..0dd766f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -56,6 +56,9 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { Locale.setDefault(originalLocale) } + createQueryTest(NaN to Decimal, +SELECT CAST(CAST('NaN' AS DOUBLE) AS DECIMAL(1,1)) FROM src LIMIT 1) + createQueryTest(constant null testing, SELECT |IF(FALSE, CAST(NULL AS STRING), CAST(1 AS STRING)) AS COL1, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4391][SQL] Configure parquet filters using SQLConf
Repository: spark Updated Branches: refs/heads/master a0300ea32 - e47c38763 [SPARK-4391][SQL] Configure parquet filters using SQLConf This is more uniform with the rest of SQL configuration and allows it to be turned on and off without restarting the SparkContext. In this PR I also turn off filter pushdown by default due to a number of outstanding issues (in particular SPARK-4258). When those are fixed we should turn it back on by default. Author: Michael Armbrust mich...@databricks.com Closes #3258 from marmbrus/parquetFilters and squashes the following commits: 5655bfe [Michael Armbrust] Remove extra line. 15e9a98 [Michael Armbrust] Enable filters for tests 75afd39 [Michael Armbrust] Fix comments 78fa02d [Michael Armbrust] off by default e7f9e16 [Michael Armbrust] First draft of correctly configuring parquet filter pushdown Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e47c3876 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e47c3876 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e47c3876 Branch: refs/heads/master Commit: e47c38763914aaf89a7a851c5f41b7549a75615b Parents: a0300ea Author: Michael Armbrust mich...@databricks.com Authored: Fri Nov 14 14:59:35 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Fri Nov 14 14:59:35 2014 -0800 -- .../src/main/scala/org/apache/spark/sql/SQLConf.scala | 8 +++- .../apache/spark/sql/execution/SparkStrategies.scala | 7 +-- .../org/apache/spark/sql/parquet/ParquetFilters.scala | 2 -- .../spark/sql/parquet/ParquetTableOperations.scala | 13 +++-- .../apache/spark/sql/parquet/ParquetQuerySuite.scala | 2 ++ 5 files changed, 21 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e47c3876/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 279495a..cd7d78e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -22,7 +22,6 @@ import scala.collection.JavaConversions._ import java.util.Properties - private[spark] object SQLConf { val COMPRESS_CACHED = spark.sql.inMemoryColumnarStorage.compressed val COLUMN_BATCH_SIZE = spark.sql.inMemoryColumnarStorage.batchSize @@ -32,9 +31,12 @@ private[spark] object SQLConf { val SHUFFLE_PARTITIONS = spark.sql.shuffle.partitions val CODEGEN_ENABLED = spark.sql.codegen val DIALECT = spark.sql.dialect + val PARQUET_BINARY_AS_STRING = spark.sql.parquet.binaryAsString val PARQUET_CACHE_METADATA = spark.sql.parquet.cacheMetadata val PARQUET_COMPRESSION = spark.sql.parquet.compression.codec + val PARQUET_FILTER_PUSHDOWN_ENABLED = spark.sql.parquet.filterPushdown + val COLUMN_NAME_OF_CORRUPT_RECORD = spark.sql.columnNameOfCorruptRecord // This is only used for the thriftserver @@ -90,6 +92,10 @@ private[sql] trait SQLConf { /** Number of partitions to use for shuffle operators. */ private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, 200).toInt + /** When true predicates will be passed to the parquet record reader when possible. */ + private[spark] def parquetFilterPushDown = +getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, false).toBoolean + /** * When set to true, Spark SQL will use the Scala compiler at runtime to generate custom bytecode * that evaluates expressions found in queries. In general this custom code runs much faster http://git-wip-us.apache.org/repos/asf/spark/blob/e47c3876/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index cc7e0c0..03cd5bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -208,7 +208,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) = val prunePushedDownFilters = - if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) { + if (sqlContext.parquetFilterPushDown) { (filters: Seq[Expression]) = { filters.filter { filter =
spark git commit: [SPARK-4391][SQL] Configure parquet filters using SQLConf
Repository: spark Updated Branches: refs/heads/branch-1.2 0dd924178 - 576688aa2 [SPARK-4391][SQL] Configure parquet filters using SQLConf This is more uniform with the rest of SQL configuration and allows it to be turned on and off without restarting the SparkContext. In this PR I also turn off filter pushdown by default due to a number of outstanding issues (in particular SPARK-4258). When those are fixed we should turn it back on by default. Author: Michael Armbrust mich...@databricks.com Closes #3258 from marmbrus/parquetFilters and squashes the following commits: 5655bfe [Michael Armbrust] Remove extra line. 15e9a98 [Michael Armbrust] Enable filters for tests 75afd39 [Michael Armbrust] Fix comments 78fa02d [Michael Armbrust] off by default e7f9e16 [Michael Armbrust] First draft of correctly configuring parquet filter pushdown (cherry picked from commit e47c38763914aaf89a7a851c5f41b7549a75615b) Signed-off-by: Michael Armbrust mich...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/576688aa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/576688aa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/576688aa Branch: refs/heads/branch-1.2 Commit: 576688aa2a19bd4ba239a2b93af7947f983e5124 Parents: 0dd9241 Author: Michael Armbrust mich...@databricks.com Authored: Fri Nov 14 14:59:35 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Fri Nov 14 14:59:45 2014 -0800 -- .../src/main/scala/org/apache/spark/sql/SQLConf.scala | 8 +++- .../apache/spark/sql/execution/SparkStrategies.scala | 7 +-- .../org/apache/spark/sql/parquet/ParquetFilters.scala | 2 -- .../spark/sql/parquet/ParquetTableOperations.scala | 13 +++-- .../apache/spark/sql/parquet/ParquetQuerySuite.scala | 2 ++ 5 files changed, 21 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/576688aa/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 279495a..cd7d78e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -22,7 +22,6 @@ import scala.collection.JavaConversions._ import java.util.Properties - private[spark] object SQLConf { val COMPRESS_CACHED = spark.sql.inMemoryColumnarStorage.compressed val COLUMN_BATCH_SIZE = spark.sql.inMemoryColumnarStorage.batchSize @@ -32,9 +31,12 @@ private[spark] object SQLConf { val SHUFFLE_PARTITIONS = spark.sql.shuffle.partitions val CODEGEN_ENABLED = spark.sql.codegen val DIALECT = spark.sql.dialect + val PARQUET_BINARY_AS_STRING = spark.sql.parquet.binaryAsString val PARQUET_CACHE_METADATA = spark.sql.parquet.cacheMetadata val PARQUET_COMPRESSION = spark.sql.parquet.compression.codec + val PARQUET_FILTER_PUSHDOWN_ENABLED = spark.sql.parquet.filterPushdown + val COLUMN_NAME_OF_CORRUPT_RECORD = spark.sql.columnNameOfCorruptRecord // This is only used for the thriftserver @@ -90,6 +92,10 @@ private[sql] trait SQLConf { /** Number of partitions to use for shuffle operators. */ private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, 200).toInt + /** When true predicates will be passed to the parquet record reader when possible. */ + private[spark] def parquetFilterPushDown = +getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, false).toBoolean + /** * When set to true, Spark SQL will use the Scala compiler at runtime to generate custom bytecode * that evaluates expressions found in queries. In general this custom code runs much faster http://git-wip-us.apache.org/repos/asf/spark/blob/576688aa/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index cc7e0c0..03cd5bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -208,7 +208,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) = val prunePushedDownFilters = - if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) { +
spark git commit: [SQL] Minor cleanup of comments, errors and override.
Repository: spark Updated Branches: refs/heads/master e47c38763 - f805025e8 [SQL] Minor cleanup of comments, errors and override. Author: Michael Armbrust mich...@databricks.com Closes #3257 from marmbrus/minorCleanup and squashes the following commits: d8b5abc [Michael Armbrust] Use interpolation. 2fdf903 [Michael Armbrust] Better error message when coalesce can't be resolved. f9fa6cf [Michael Armbrust] Methods in a final class do not also need to be final, use override. 199fd98 [Michael Armbrust] Fix typo Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f805025e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f805025e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f805025e Branch: refs/heads/master Commit: f805025e8efe9cd522e8875141ec27df8d16bbe0 Parents: e47c387 Author: Michael Armbrust mich...@databricks.com Authored: Fri Nov 14 15:00:42 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Fri Nov 14 15:00:42 2014 -0800 -- .../spark/sql/catalyst/expressions/aggregates.scala | 2 +- .../expressions/codegen/GenerateProjection.scala| 16 .../sql/catalyst/expressions/nullFunctions.scala| 4 +++- 3 files changed, 12 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f805025e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index 2b364fc..3ceb5ec 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -304,7 +304,7 @@ case class Average(child: Expression) extends PartialAggregate with trees.UnaryN child.dataType match { case DecimalType.Fixed(_, _) = -// Turn the results to unlimited decimals for the divsion, before going back to fixed +// Turn the results to unlimited decimals for the division, before going back to fixed val castedSum = Cast(Sum(partialSum.toAttribute), DecimalType.Unlimited) val castedCount = Cast(Sum(partialCount.toAttribute), DecimalType.Unlimited) SplitEvaluation( http://git-wip-us.apache.org/repos/asf/spark/blob/f805025e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala index 7871a62..2ff6116 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala @@ -53,8 +53,8 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { val nullFunctions = q private[this] var nullBits = new Array[Boolean](${expressions.size}) -final def setNullAt(i: Int) = { nullBits(i) = true } -final def isNullAt(i: Int) = nullBits(i) +override def setNullAt(i: Int) = { nullBits(i) = true } +override def isNullAt(i: Int) = nullBits(i) .children val tupleElements = expressions.zipWithIndex.flatMap { @@ -82,7 +82,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { val iLit = ru.Literal(Constant(i)) qif(isNullAt($iLit)) { null } else { ${newTermName(sc$i)} } } - qfinal def iterator = Iterator[Any](..$allColumns) + qoverride def iterator = Iterator[Any](..$allColumns) } val accessorFailure = qscala.sys.error(Invalid ordinal: + i) @@ -94,7 +94,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { qif(i == $ordinal) { if(isNullAt($i)) return null else return $elementName } } - qfinal def apply(i: Int): Any = { ..$cases; $accessorFailure } + qoverride def apply(i: Int): Any = { ..$cases; $accessorFailure } } val updateFunction = { @@ -114,7 +114,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { return } } - qfinal def update(i: Int, value: Any): Unit = { ..$cases; $accessorFailure } + qoverride def update(i: Int, value: Any): Unit = { ..$cases; $accessorFailure } } val
spark git commit: [SQL] Minor cleanup of comments, errors and override.
Repository: spark Updated Branches: refs/heads/branch-1.2 576688aa2 - e35672e7e [SQL] Minor cleanup of comments, errors and override. Author: Michael Armbrust mich...@databricks.com Closes #3257 from marmbrus/minorCleanup and squashes the following commits: d8b5abc [Michael Armbrust] Use interpolation. 2fdf903 [Michael Armbrust] Better error message when coalesce can't be resolved. f9fa6cf [Michael Armbrust] Methods in a final class do not also need to be final, use override. 199fd98 [Michael Armbrust] Fix typo (cherry picked from commit f805025e8efe9cd522e8875141ec27df8d16bbe0) Signed-off-by: Michael Armbrust mich...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e35672e7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e35672e7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e35672e7 Branch: refs/heads/branch-1.2 Commit: e35672e7edeb7f68bece12d3d656419d3e610e95 Parents: 576688a Author: Michael Armbrust mich...@databricks.com Authored: Fri Nov 14 15:00:42 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Fri Nov 14 15:00:51 2014 -0800 -- .../spark/sql/catalyst/expressions/aggregates.scala | 2 +- .../expressions/codegen/GenerateProjection.scala| 16 .../sql/catalyst/expressions/nullFunctions.scala| 4 +++- 3 files changed, 12 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e35672e7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index 2b364fc..3ceb5ec 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -304,7 +304,7 @@ case class Average(child: Expression) extends PartialAggregate with trees.UnaryN child.dataType match { case DecimalType.Fixed(_, _) = -// Turn the results to unlimited decimals for the divsion, before going back to fixed +// Turn the results to unlimited decimals for the division, before going back to fixed val castedSum = Cast(Sum(partialSum.toAttribute), DecimalType.Unlimited) val castedCount = Cast(Sum(partialCount.toAttribute), DecimalType.Unlimited) SplitEvaluation( http://git-wip-us.apache.org/repos/asf/spark/blob/e35672e7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala index 7871a62..2ff6116 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala @@ -53,8 +53,8 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { val nullFunctions = q private[this] var nullBits = new Array[Boolean](${expressions.size}) -final def setNullAt(i: Int) = { nullBits(i) = true } -final def isNullAt(i: Int) = nullBits(i) +override def setNullAt(i: Int) = { nullBits(i) = true } +override def isNullAt(i: Int) = nullBits(i) .children val tupleElements = expressions.zipWithIndex.flatMap { @@ -82,7 +82,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { val iLit = ru.Literal(Constant(i)) qif(isNullAt($iLit)) { null } else { ${newTermName(sc$i)} } } - qfinal def iterator = Iterator[Any](..$allColumns) + qoverride def iterator = Iterator[Any](..$allColumns) } val accessorFailure = qscala.sys.error(Invalid ordinal: + i) @@ -94,7 +94,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { qif(i == $ordinal) { if(isNullAt($i)) return null else return $elementName } } - qfinal def apply(i: Int): Any = { ..$cases; $accessorFailure } + qoverride def apply(i: Int): Any = { ..$cases; $accessorFailure } } val updateFunction = { @@ -114,7 +114,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { return } } - qfinal def update(i: Int, value: Any): Unit = { ..$cases;
spark git commit: [SQL] Don't shuffle code generated rows
Repository: spark Updated Branches: refs/heads/branch-1.2 e35672e7e - 680bc0619 [SQL] Don't shuffle code generated rows When sort based shuffle and code gen are on we were trying to ship the code generated rows during a shuffle. This doesn't work because the classes don't exist on the other side. Instead we now copy into a generic row before shipping. Author: Michael Armbrust mich...@databricks.com Closes #3263 from marmbrus/aggCodeGen and squashes the following commits: f6ba8cf [Michael Armbrust] fix and test (cherry picked from commit 4b4b50c9e596673c1534df97effad50d107a8007) Signed-off-by: Michael Armbrust mich...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/680bc061 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/680bc061 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/680bc061 Branch: refs/heads/branch-1.2 Commit: 680bc06195ecdc6ff2390c55adeb637649f2c8f3 Parents: e35672e Author: Michael Armbrust mich...@databricks.com Authored: Fri Nov 14 15:03:23 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Fri Nov 14 15:03:45 2014 -0800 -- .../main/scala/org/apache/spark/sql/execution/Exchange.scala | 4 ++-- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 7 +++ 2 files changed, 9 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/680bc061/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 927f400..cff7a01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -47,8 +47,8 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una // TODO: Eliminate redundant expressions in grouping key and value. val rdd = if (sortBasedShuffleOn) { child.execute().mapPartitions { iter = -val hashExpressions = newProjection(expressions, child.output) -iter.map(r = (hashExpressions(r), r.copy())) +val hashExpressions = newMutableProjection(expressions, child.output)() +iter.map(r = (hashExpressions(r).copy(), r.copy())) } } else { child.execute().mapPartitions { iter = http://git-wip-us.apache.org/repos/asf/spark/blob/680bc061/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 8a80724..5dd777f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -72,6 +72,13 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { 2.5) } + test(aggregation with codegen) { +val originalValue = codegenEnabled +setConf(SQLConf.CODEGEN_ENABLED, true) +sql(SELECT key FROM testData GROUP BY key).collect() +setConf(SQLConf.CODEGEN_ENABLED, originalValue.toString) + } + test(SPARK-3176 Added Parser of SQL LAST()) { checkAnswer( sql(SELECT LAST(n) FROM lowerCaseData), - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4322][SQL] Enables struct fields as sub expressions of grouping fields
Repository: spark Updated Branches: refs/heads/master 4b4b50c9e - 0c7b66bd4 [SPARK-4322][SQL] Enables struct fields as sub expressions of grouping fields While resolving struct fields, the resulted `GetField` expression is wrapped with an `Alias` to make it a named expression. Assume `a` is a struct instance with a field `b`, then `a.b` will be resolved as `Alias(GetField(a, b), b)`. Thus, for this following SQL query: ```sql SELECT a.b + 1 FROM t GROUP BY a.b + 1 ``` the grouping expression is ```scala Add(GetField(a, b), Literal(1, IntegerType)) ``` while the aggregation expression is ```scala Add(Alias(GetField(a, b), b), Literal(1, IntegerType)) ``` This mismatch makes the above SQL query fail during the both analysis and execution phases. This PR fixes this issue by removing the alias when substituting aggregation expressions. !-- Reviewable:start -- [img src=https://reviewable.io/review_button.png; height=40 alt=Review on Reviewable/](https://reviewable.io/reviews/apache/spark/3248) !-- Reviewable:end -- Author: Cheng Lian l...@databricks.com Closes #3248 from liancheng/spark-4322 and squashes the following commits: 23a46ea [Cheng Lian] Code simplification dd20a79 [Cheng Lian] Should only trim aliases around `GetField`s 7f46532 [Cheng Lian] Enables struct fields as sub expressions of grouping fields Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c7b66bd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c7b66bd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c7b66bd Branch: refs/heads/master Commit: 0c7b66bd449093bb5d2dafaf91d54e63e601e320 Parents: 4b4b50c Author: Cheng Lian l...@databricks.com Authored: Fri Nov 14 15:09:36 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Fri Nov 14 15:09:36 2014 -0800 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 27 +--- .../spark/sql/catalyst/planning/patterns.scala | 15 --- .../org/apache/spark/sql/SQLQuerySuite.scala| 12 - 3 files changed, 34 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0c7b66bd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a448c79..d3b4cf8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -60,7 +60,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool ResolveFunctions :: GlobalAggregates :: UnresolvedHavingClauseAttributes :: - TrimAliases :: + TrimGroupingAliases :: typeCoercionRules ++ extendedRules : _*), Batch(Check Analysis, Once, @@ -93,17 +93,10 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool /** * Removes no-op Alias expressions from the plan. */ - object TrimAliases extends Rule[LogicalPlan] { + object TrimGroupingAliases extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case Aggregate(groups, aggs, child) = -Aggregate( - groups.map { -_ transform { - case Alias(c, _) = c -} - }, - aggs, - child) +Aggregate(groups.map(_.transform { case Alias(c, _) = c }), aggs, child) } } @@ -122,10 +115,15 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool case e = e.children.forall(isValidAggregateExpression) } - aggregateExprs.foreach { e = -if (!isValidAggregateExpression(e)) { - throw new TreeNodeException(plan, sExpression not in GROUP BY: $e) -} + aggregateExprs.find { e = +!isValidAggregateExpression(e.transform { + // Should trim aliases around `GetField`s. These aliases are introduced while + // resolving struct field accesses, because `GetField` is not a `NamedExpression`. + // (Should we just turn `GetField` into a `NamedExpression`?) + case Alias(g: GetField, _) = g +}) + }.foreach { e = +throw new TreeNodeException(plan, sExpression not in GROUP BY: $e) } aggregatePlan @@ -328,4 +326,3 @@ object EliminateAnalysisOperators extends Rule[LogicalPlan] { case Subquery(_, child) = child } } -
spark git commit: [SPARK-4322][SQL] Enables struct fields as sub expressions of grouping fields
Repository: spark Updated Branches: refs/heads/branch-1.2 680bc0619 - 1cac30083 [SPARK-4322][SQL] Enables struct fields as sub expressions of grouping fields While resolving struct fields, the resulted `GetField` expression is wrapped with an `Alias` to make it a named expression. Assume `a` is a struct instance with a field `b`, then `a.b` will be resolved as `Alias(GetField(a, b), b)`. Thus, for this following SQL query: ```sql SELECT a.b + 1 FROM t GROUP BY a.b + 1 ``` the grouping expression is ```scala Add(GetField(a, b), Literal(1, IntegerType)) ``` while the aggregation expression is ```scala Add(Alias(GetField(a, b), b), Literal(1, IntegerType)) ``` This mismatch makes the above SQL query fail during the both analysis and execution phases. This PR fixes this issue by removing the alias when substituting aggregation expressions. !-- Reviewable:start -- [img src=https://reviewable.io/review_button.png; height=40 alt=Review on Reviewable/](https://reviewable.io/reviews/apache/spark/3248) !-- Reviewable:end -- Author: Cheng Lian l...@databricks.com Closes #3248 from liancheng/spark-4322 and squashes the following commits: 23a46ea [Cheng Lian] Code simplification dd20a79 [Cheng Lian] Should only trim aliases around `GetField`s 7f46532 [Cheng Lian] Enables struct fields as sub expressions of grouping fields (cherry picked from commit 0c7b66bd449093bb5d2dafaf91d54e63e601e320) Signed-off-by: Michael Armbrust mich...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1cac3008 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1cac3008 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1cac3008 Branch: refs/heads/branch-1.2 Commit: 1cac30083b97c98c3663e2d2cd057124f033eb34 Parents: 680bc06 Author: Cheng Lian l...@databricks.com Authored: Fri Nov 14 15:09:36 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Fri Nov 14 15:09:55 2014 -0800 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 27 +--- .../spark/sql/catalyst/planning/patterns.scala | 15 --- .../org/apache/spark/sql/SQLQuerySuite.scala| 12 - 3 files changed, 34 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1cac3008/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a448c79..d3b4cf8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -60,7 +60,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool ResolveFunctions :: GlobalAggregates :: UnresolvedHavingClauseAttributes :: - TrimAliases :: + TrimGroupingAliases :: typeCoercionRules ++ extendedRules : _*), Batch(Check Analysis, Once, @@ -93,17 +93,10 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool /** * Removes no-op Alias expressions from the plan. */ - object TrimAliases extends Rule[LogicalPlan] { + object TrimGroupingAliases extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case Aggregate(groups, aggs, child) = -Aggregate( - groups.map { -_ transform { - case Alias(c, _) = c -} - }, - aggs, - child) +Aggregate(groups.map(_.transform { case Alias(c, _) = c }), aggs, child) } } @@ -122,10 +115,15 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool case e = e.children.forall(isValidAggregateExpression) } - aggregateExprs.foreach { e = -if (!isValidAggregateExpression(e)) { - throw new TreeNodeException(plan, sExpression not in GROUP BY: $e) -} + aggregateExprs.find { e = +!isValidAggregateExpression(e.transform { + // Should trim aliases around `GetField`s. These aliases are introduced while + // resolving struct field accesses, because `GetField` is not a `NamedExpression`. + // (Should we just turn `GetField` into a `NamedExpression`?) + case Alias(g: GetField, _) = g +}) + }.foreach { e = +throw new TreeNodeException(plan, sExpression not in GROUP BY: $e) } aggregatePlan @@ -328,4 +326,3 @@ object
spark git commit: [SPARK-4386] Improve performance when writing Parquet files.
Repository: spark Updated Branches: refs/heads/master 0c7b66bd4 - f76b96837 [SPARK-4386] Improve performance when writing Parquet files. If you profile the writing of a Parquet file, the single worst time consuming call inside of org.apache.spark.sql.parquet.MutableRowWriteSupport.write is actually in the scala.collection.AbstractSequence.size call. This is because the size call actually ends up COUNTING the elements in a scala.collection.LinearSeqOptimized.length (optimized?). This doesn't need to be done. size is called repeatedly where needed rather than called once at the top of the method and stored in a 'val'. Author: Jim Carroll j...@dontcallme.com Closes #3254 from jimfcarroll/parquet-perf and squashes the following commits: 30cc0b5 [Jim Carroll] Improve performance when writing Parquet files. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f76b9683 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f76b9683 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f76b9683 Branch: refs/heads/master Commit: f76b9683706232c3d4e8e6e61627b8188dcb79dc Parents: 0c7b66b Author: Jim Carroll j...@dontcallme.com Authored: Fri Nov 14 15:11:53 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Fri Nov 14 15:11:53 2014 -0800 -- .../spark/sql/parquet/ParquetTableSupport.scala | 14 -- 1 file changed, 8 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f76b9683/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index 7bc2496..ef3687e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -152,14 +152,15 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { } override def write(record: Row): Unit = { -if (attributes.size record.size) { +val attributesSize = attributes.size +if (attributesSize record.size) { throw new IndexOutOfBoundsException( -sTrying to write more fields than contained in row (${attributes.size}${record.size})) +sTrying to write more fields than contained in row (${attributesSize}${record.size})) } var index = 0 writer.startMessage() -while(index attributes.size) { +while(index attributesSize) { // null values indicate optional fields but we do not check currently if (record(index) != null) { writer.startField(attributes(index).name, index) @@ -312,14 +313,15 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { // Optimized for non-nested rows private[parquet] class MutableRowWriteSupport extends RowWriteSupport { override def write(record: Row): Unit = { -if (attributes.size record.size) { +val attributesSize = attributes.size +if (attributesSize record.size) { throw new IndexOutOfBoundsException( -sTrying to write more fields than contained in row (${attributes.size}${record.size})) +sTrying to write more fields than contained in row (${attributesSize}${record.size})) } var index = 0 writer.startMessage() -while(index attributes.size) { +while(index attributesSize) { // null values indicate optional fields but we do not check currently if (record(index) != null record(index) != Nil) { writer.startField(attributes(index).name, index) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4386] Improve performance when writing Parquet files.
Repository: spark Updated Branches: refs/heads/branch-1.2 1cac30083 - 7f242dc29 [SPARK-4386] Improve performance when writing Parquet files. If you profile the writing of a Parquet file, the single worst time consuming call inside of org.apache.spark.sql.parquet.MutableRowWriteSupport.write is actually in the scala.collection.AbstractSequence.size call. This is because the size call actually ends up COUNTING the elements in a scala.collection.LinearSeqOptimized.length (optimized?). This doesn't need to be done. size is called repeatedly where needed rather than called once at the top of the method and stored in a 'val'. Author: Jim Carroll j...@dontcallme.com Closes #3254 from jimfcarroll/parquet-perf and squashes the following commits: 30cc0b5 [Jim Carroll] Improve performance when writing Parquet files. (cherry picked from commit f76b9683706232c3d4e8e6e61627b8188dcb79dc) Signed-off-by: Michael Armbrust mich...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f242dc2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f242dc2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f242dc2 Branch: refs/heads/branch-1.2 Commit: 7f242dc2911bbc821e90fed81421af9b8d6dcd9a Parents: 1cac300 Author: Jim Carroll j...@dontcallme.com Authored: Fri Nov 14 15:11:53 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Fri Nov 14 15:12:07 2014 -0800 -- .../spark/sql/parquet/ParquetTableSupport.scala | 14 -- 1 file changed, 8 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7f242dc2/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index 7bc2496..ef3687e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -152,14 +152,15 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { } override def write(record: Row): Unit = { -if (attributes.size record.size) { +val attributesSize = attributes.size +if (attributesSize record.size) { throw new IndexOutOfBoundsException( -sTrying to write more fields than contained in row (${attributes.size}${record.size})) +sTrying to write more fields than contained in row (${attributesSize}${record.size})) } var index = 0 writer.startMessage() -while(index attributes.size) { +while(index attributesSize) { // null values indicate optional fields but we do not check currently if (record(index) != null) { writer.startField(attributes(index).name, index) @@ -312,14 +313,15 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { // Optimized for non-nested rows private[parquet] class MutableRowWriteSupport extends RowWriteSupport { override def write(record: Row): Unit = { -if (attributes.size record.size) { +val attributesSize = attributes.size +if (attributesSize record.size) { throw new IndexOutOfBoundsException( -sTrying to write more fields than contained in row (${attributes.size}${record.size})) +sTrying to write more fields than contained in row (${attributesSize}${record.size})) } var index = 0 writer.startMessage() -while(index attributes.size) { +while(index attributesSize) { // null values indicate optional fields but we do not check currently if (record(index) != null record(index) != Nil) { writer.startField(attributes(index).name, index) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4365][SQL] Remove unnecessary filter call on records returned from parquet library
Repository: spark Updated Branches: refs/heads/master f76b96837 - 63ca3af66 [SPARK-4365][SQL] Remove unnecessary filter call on records returned from parquet library Since parquet library has been updated , we no longer need to filter the records returned from parquet library for null records , as now the library skips those : from parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java public boolean nextKeyValue() throws IOException, InterruptedException { boolean recordFound = false; while (!recordFound) { // no more records left if (current = total) { return false; } try { checkRead(); currentValue = recordReader.read(); current ++; if (recordReader.shouldSkipCurrentRecord()) { // this record is being filtered via the filter2 package if (DEBUG) LOG.debug(skipping record); continue; } if (currentValue == null) { // only happens with FilteredRecordReader at end of block current = totalCountLoadedSoFar; if (DEBUG) LOG.debug(filtered record reader reached end of block); continue; } recordFound = true; if (DEBUG) LOG.debug(read value: + currentValue); } catch (RuntimeException e) { throw new ParquetDecodingException(format(Can not read value at %d in block %d in file %s, current, currentBlock, file), e); } } return true; } Author: Yash Datta yash.da...@guavus.com Closes #3229 from saucam/remove_filter and squashes the following commits: 8909ae9 [Yash Datta] SPARK-4365: Remove unnecessary filter call on records returned from parquet library Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/63ca3af6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/63ca3af6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/63ca3af6 Branch: refs/heads/master Commit: 63ca3af66f9680fd12adee82fb4d342caae5cea4 Parents: f76b968 Author: Yash Datta yash.da...@guavus.com Authored: Fri Nov 14 15:16:36 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Fri Nov 14 15:16:40 2014 -0800 -- .../org/apache/spark/sql/parquet/ParquetTableOperations.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/63ca3af6/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 5f93279..f6bed50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -159,7 +159,7 @@ case class ParquetTableScan( } } else { baseRDD.map(_._2) -}.filter(_ != null) // Parquet's record filters may produce null values +} } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4365][SQL] Remove unnecessary filter call on records returned from parquet library
Repository: spark Updated Branches: refs/heads/branch-1.2 7f242dc29 - aa5d8e57c [SPARK-4365][SQL] Remove unnecessary filter call on records returned from parquet library Since parquet library has been updated , we no longer need to filter the records returned from parquet library for null records , as now the library skips those : from parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java public boolean nextKeyValue() throws IOException, InterruptedException { boolean recordFound = false; while (!recordFound) { // no more records left if (current = total) { return false; } try { checkRead(); currentValue = recordReader.read(); current ++; if (recordReader.shouldSkipCurrentRecord()) { // this record is being filtered via the filter2 package if (DEBUG) LOG.debug(skipping record); continue; } if (currentValue == null) { // only happens with FilteredRecordReader at end of block current = totalCountLoadedSoFar; if (DEBUG) LOG.debug(filtered record reader reached end of block); continue; } recordFound = true; if (DEBUG) LOG.debug(read value: + currentValue); } catch (RuntimeException e) { throw new ParquetDecodingException(format(Can not read value at %d in block %d in file %s, current, currentBlock, file), e); } } return true; } Author: Yash Datta yash.da...@guavus.com Closes #3229 from saucam/remove_filter and squashes the following commits: 8909ae9 [Yash Datta] SPARK-4365: Remove unnecessary filter call on records returned from parquet library (cherry picked from commit 63ca3af66f9680fd12adee82fb4d342caae5cea4) Signed-off-by: Michael Armbrust mich...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa5d8e57 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa5d8e57 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa5d8e57 Branch: refs/heads/branch-1.2 Commit: aa5d8e57c63d045b291a5c1fc99e782a0f191854 Parents: 7f242dc Author: Yash Datta yash.da...@guavus.com Authored: Fri Nov 14 15:16:36 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Fri Nov 14 15:17:12 2014 -0800 -- .../org/apache/spark/sql/parquet/ParquetTableOperations.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/aa5d8e57/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 5f93279..f6bed50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -159,7 +159,7 @@ case class ParquetTableScan( } } else { baseRDD.map(_._2) -}.filter(_ != null) // Parquet's record filters may produce null values +} } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: SPARK-4214. With dynamic allocation, avoid outstanding requests for more...
Repository: spark Updated Branches: refs/heads/branch-1.2 ef39ec419 - c425e31ad SPARK-4214. With dynamic allocation, avoid outstanding requests for more... ... executors than pending tasks need. WIP. Still need to add and fix tests. Author: Sandy Ryza sa...@cloudera.com Closes #3204 from sryza/sandy-spark-4214 and squashes the following commits: 35cf0e0 [Sandy Ryza] Add comment 13b53df [Sandy Ryza] Review feedback 067465f [Sandy Ryza] Whitespace fix 6ae080c [Sandy Ryza] Add tests and get num pending tasks from ExecutorAllocationListener 531e2b6 [Sandy Ryza] SPARK-4214. With dynamic allocation, avoid outstanding requests for more executors than pending tasks need. (cherry picked from commit ad42b283246b93654c5fd731cd618fee74d8c4da) Signed-off-by: Andrew Or and...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c425e31a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c425e31a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c425e31a Branch: refs/heads/branch-1.2 Commit: c425e31ad0132ddb0a817b26fe1e5d11a7ef7a63 Parents: ef39ec4 Author: Sandy Ryza sa...@cloudera.com Authored: Fri Nov 14 15:51:05 2014 -0800 Committer: Andrew Or and...@databricks.com Committed: Fri Nov 14 15:51:50 2014 -0800 -- .../spark/ExecutorAllocationManager.scala | 55 .../spark/ExecutorAllocationManagerSuite.scala | 48 + 2 files changed, 94 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c425e31a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index ef93009..88adb89 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -28,7 +28,9 @@ import org.apache.spark.scheduler._ * the scheduler queue is not drained in N seconds, then new executors are added. If the queue * persists for another M seconds, then more executors are added and so on. The number added * in each round increases exponentially from the previous round until an upper bound on the - * number of executors has been reached. + * number of executors has been reached. The upper bound is based both on a configured property + * and on the number of tasks pending: the policy will never increase the number of executor + * requests past the number needed to handle all pending tasks. * * The rationale for the exponential increase is twofold: (1) Executors should be added slowly * in the beginning in case the number of extra executors needed turns out to be small. Otherwise, @@ -82,6 +84,12 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging // During testing, the methods to actually kill and add executors are mocked out private val testing = conf.getBoolean(spark.dynamicAllocation.testing, false) + // TODO: The default value of 1 for spark.executor.cores works right now because dynamic + // allocation is only supported for YARN and the default number of cores per executor in YARN is + // 1, but it might need to be attained differently for different cluster managers + private val tasksPerExecutor = +conf.getInt(spark.executor.cores, 1) / conf.getInt(spark.task.cpus, 1) + validateSettings() // Number of executors to add in the next round @@ -110,6 +118,9 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging // Clock used to schedule when executors should be added and removed private var clock: Clock = new RealClock + // Listener for Spark events that impact the allocation policy + private val listener = new ExecutorAllocationListener(this) + /** * Verify that the settings specified through the config are valid. * If not, throw an appropriate exception. @@ -141,6 +152,9 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging throw new SparkException(Dynamic allocation of executors requires the external + shuffle service. You may enable this through spark.shuffle.service.enabled.) } +if (tasksPerExecutor == 0) { + throw new SparkException(spark.executor.cores must not be less than spark.task.cpus.cores) +} } /** @@ -154,7 +168,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging * Register for scheduler callbacks to decide when to add and remove executors. */ def start(): Unit = { -val listener = new ExecutorAllocationListener(this)
spark git commit: [SPARK-4404]SparkSubmitDriverBootstrapper should stop after its SparkSubmit sub-proc...
Repository: spark Updated Branches: refs/heads/master ad42b2832 - 303a4e4d2 [SPARK-4404]SparkSubmitDriverBootstrapper should stop after its SparkSubmit sub-proc... ...ess ends https://issues.apache.org/jira/browse/SPARK-4404 When we have spark.driver.extra* or spark.driver.memory in SPARK_SUBMIT_PROPERTIES_FILE, spark-class will use SparkSubmitDriverBootstrapper to launch driver. If we get process id of SparkSubmitDriverBootstrapper and wanna kill it during its running, we expect its SparkSubmit sub-process stop also. Author: WangTao barneystin...@aliyun.com Author: WangTaoTheTonic barneystin...@aliyun.com Closes #3266 from WangTaoTheTonic/killsubmit and squashes the following commits: e03eba5 [WangTaoTheTonic] add comments 57b5ca1 [WangTao] SparkSubmitDriverBootstrapper should stop after its SparkSubmit sub-process ends Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/303a4e4d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/303a4e4d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/303a4e4d Branch: refs/heads/master Commit: 303a4e4d23e5cd93b541480cf88d5badb9cf9622 Parents: ad42b28 Author: WangTao barneystin...@aliyun.com Authored: Fri Nov 14 20:11:51 2014 -0800 Committer: Andrew Or and...@databricks.com Committed: Fri Nov 14 20:11:51 2014 -0800 -- .../spark/deploy/SparkSubmitDriverBootstrapper.scala | 10 ++ 1 file changed, 10 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/303a4e4d/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index 2b894a7..729 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -129,6 +129,16 @@ private[spark] object SparkSubmitDriverBootstrapper { val process = builder.start() +// If we kill an app while it's running, its sub-process should be killed too. +Runtime.getRuntime().addShutdownHook(new Thread() { + override def run() = { +if (process != null) { + process.destroy() + sys.exit(process.waitFor()) +} + } +}) + // Redirect stdout and stderr from the child JVM val stdoutThread = new RedirectThread(process.getInputStream, System.out, redirect stdout) val stderrThread = new RedirectThread(process.getErrorStream, System.err, redirect stderr) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4404]SparkSubmitDriverBootstrapper should stop after its SparkSubmit sub-proc...
Repository: spark Updated Branches: refs/heads/branch-1.2 c425e31ad - 118c89c28 [SPARK-4404]SparkSubmitDriverBootstrapper should stop after its SparkSubmit sub-proc... ...ess ends https://issues.apache.org/jira/browse/SPARK-4404 When we have spark.driver.extra* or spark.driver.memory in SPARK_SUBMIT_PROPERTIES_FILE, spark-class will use SparkSubmitDriverBootstrapper to launch driver. If we get process id of SparkSubmitDriverBootstrapper and wanna kill it during its running, we expect its SparkSubmit sub-process stop also. Author: WangTao barneystin...@aliyun.com Author: WangTaoTheTonic barneystin...@aliyun.com Closes #3266 from WangTaoTheTonic/killsubmit and squashes the following commits: e03eba5 [WangTaoTheTonic] add comments 57b5ca1 [WangTao] SparkSubmitDriverBootstrapper should stop after its SparkSubmit sub-process ends (cherry picked from commit 303a4e4d23e5cd93b541480cf88d5badb9cf9622) Signed-off-by: Andrew Or and...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/118c89c2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/118c89c2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/118c89c2 Branch: refs/heads/branch-1.2 Commit: 118c89c28d1c3c048a5bd0335db4a0c65d71a4aa Parents: c425e31 Author: WangTao barneystin...@aliyun.com Authored: Fri Nov 14 20:11:51 2014 -0800 Committer: Andrew Or and...@databricks.com Committed: Fri Nov 14 20:12:05 2014 -0800 -- .../spark/deploy/SparkSubmitDriverBootstrapper.scala | 10 ++ 1 file changed, 10 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/118c89c2/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index 2b894a7..729 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -129,6 +129,16 @@ private[spark] object SparkSubmitDriverBootstrapper { val process = builder.start() +// If we kill an app while it's running, its sub-process should be killed too. +Runtime.getRuntime().addShutdownHook(new Thread() { + override def run() = { +if (process != null) { + process.destroy() + sys.exit(process.waitFor()) +} + } +}) + // Redirect stdout and stderr from the child JVM val stdoutThread = new RedirectThread(process.getInputStream, System.out, redirect stdout) val stderrThread = new RedirectThread(process.getErrorStream, System.err, redirect stderr) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4415] [PySpark] JVM should exit after Python exit
Repository: spark Updated Branches: refs/heads/branch-1.2 118c89c28 - 306e68cf0 [SPARK-4415] [PySpark] JVM should exit after Python exit When JVM is started in a Python process, it should exit once the stdin is closed. test: add spark.driver.memory in conf/spark-defaults.conf ``` daviesdm:~/work/spark$ cat conf/spark-defaults.conf spark.driver.memory 8g daviesdm:~/work/spark$ bin/pyspark quit daviesdm:~/work/spark$ jps 4931 Jps 286 daviesdm:~/work/spark$ python wc.py 943738 0.719928026199 daviesdm:~/work/spark$ jps 286 4990 Jps ``` Author: Davies Liu dav...@databricks.com Closes #3274 from davies/exit and squashes the following commits: df0e524 [Davies Liu] address comments ce8599c [Davies Liu] address comments 050651f [Davies Liu] JVM should exit after Python exit (cherry picked from commit 7fe08b43c78bf9e8515f671e72aa03a83ea782f8) Signed-off-by: Andrew Or and...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/306e68cf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/306e68cf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/306e68cf Branch: refs/heads/branch-1.2 Commit: 306e68cf00e6ec6b10f1a29eb7434f3f3ea27752 Parents: 118c89c Author: Davies Liu dav...@databricks.com Authored: Fri Nov 14 20:13:46 2014 -0800 Committer: Andrew Or and...@databricks.com Committed: Fri Nov 14 20:14:40 2014 -0800 -- bin/pyspark | 2 -- bin/pyspark2.cmd | 1 - .../spark/deploy/SparkSubmitDriverBootstrapper.scala | 11 ++- python/pyspark/java_gateway.py | 4 +++- 4 files changed, 9 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/306e68cf/bin/pyspark -- diff --git a/bin/pyspark b/bin/pyspark index 1d8c94d..0b4f695 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -132,7 +132,5 @@ if [[ $1 =~ \.py$ ]]; then gatherSparkSubmitOpts $@ exec $FWDIR/bin/spark-submit ${SUBMISSION_OPTS[@]} $primary ${APPLICATION_OPTS[@]} else - # PySpark shell requires special handling downstream - export PYSPARK_SHELL=1 exec $PYSPARK_DRIVER_PYTHON $PYSPARK_DRIVER_PYTHON_OPTS fi http://git-wip-us.apache.org/repos/asf/spark/blob/306e68cf/bin/pyspark2.cmd -- diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd index 59415e9..a542ec8 100644 --- a/bin/pyspark2.cmd +++ b/bin/pyspark2.cmd @@ -59,7 +59,6 @@ for /f %%i in ('echo %1^| findstr /R \.py') do ( ) if [%PYTHON_FILE%] == [] ( - set PYSPARK_SHELL=1 if [%IPYTHON%] == [1] ( ipython %IPYTHON_OPTS% ) else ( http://git-wip-us.apache.org/repos/asf/spark/blob/306e68cf/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index 729..aa3743c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -149,14 +149,15 @@ private[spark] object SparkSubmitDriverBootstrapper { // subprocess there already reads directly from our stdin, so we should avoid spawning a // thread that contends with the subprocess in reading from System.in. val isWindows = Utils.isWindows -val isPySparkShell = sys.env.contains(PYSPARK_SHELL) +val isSubprocess = sys.env.contains(IS_SUBPROCESS) if (!isWindows) { val stdinThread = new RedirectThread(System.in, process.getOutputStream, redirect stdin) stdinThread.start() - // For the PySpark shell, Spark submit itself runs as a python subprocess, and so this JVM - // should terminate on broken pipe, which signals that the parent process has exited. In - // Windows, the termination logic for the PySpark shell is handled in java_gateway.py - if (isPySparkShell) { + // Spark submit (JVM) may run as a subprocess, and so this JVM should terminate on + // broken pipe, signaling that the parent process has exited. This is the case if the + // application is launched directly from python, as in the PySpark shell. In Windows, + // the termination logic is handled in java_gateway.py + if (isSubprocess) { stdinThread.join() process.destroy() } http://git-wip-us.apache.org/repos/asf/spark/blob/306e68cf/python/pyspark/java_gateway.py -- diff --git
spark git commit: [SPARK-4415] [PySpark] JVM should exit after Python exit
Repository: spark Updated Branches: refs/heads/master 303a4e4d2 - 7fe08b43c [SPARK-4415] [PySpark] JVM should exit after Python exit When JVM is started in a Python process, it should exit once the stdin is closed. test: add spark.driver.memory in conf/spark-defaults.conf ``` daviesdm:~/work/spark$ cat conf/spark-defaults.conf spark.driver.memory 8g daviesdm:~/work/spark$ bin/pyspark quit daviesdm:~/work/spark$ jps 4931 Jps 286 daviesdm:~/work/spark$ python wc.py 943738 0.719928026199 daviesdm:~/work/spark$ jps 286 4990 Jps ``` Author: Davies Liu dav...@databricks.com Closes #3274 from davies/exit and squashes the following commits: df0e524 [Davies Liu] address comments ce8599c [Davies Liu] address comments 050651f [Davies Liu] JVM should exit after Python exit Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7fe08b43 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7fe08b43 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7fe08b43 Branch: refs/heads/master Commit: 7fe08b43c78bf9e8515f671e72aa03a83ea782f8 Parents: 303a4e4 Author: Davies Liu dav...@databricks.com Authored: Fri Nov 14 20:13:46 2014 -0800 Committer: Andrew Or and...@databricks.com Committed: Fri Nov 14 20:14:33 2014 -0800 -- bin/pyspark | 2 -- bin/pyspark2.cmd | 1 - .../spark/deploy/SparkSubmitDriverBootstrapper.scala | 11 ++- python/pyspark/java_gateway.py | 4 +++- 4 files changed, 9 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7fe08b43/bin/pyspark -- diff --git a/bin/pyspark b/bin/pyspark index 1d8c94d..0b4f695 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -132,7 +132,5 @@ if [[ $1 =~ \.py$ ]]; then gatherSparkSubmitOpts $@ exec $FWDIR/bin/spark-submit ${SUBMISSION_OPTS[@]} $primary ${APPLICATION_OPTS[@]} else - # PySpark shell requires special handling downstream - export PYSPARK_SHELL=1 exec $PYSPARK_DRIVER_PYTHON $PYSPARK_DRIVER_PYTHON_OPTS fi http://git-wip-us.apache.org/repos/asf/spark/blob/7fe08b43/bin/pyspark2.cmd -- diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd index 59415e9..a542ec8 100644 --- a/bin/pyspark2.cmd +++ b/bin/pyspark2.cmd @@ -59,7 +59,6 @@ for /f %%i in ('echo %1^| findstr /R \.py') do ( ) if [%PYTHON_FILE%] == [] ( - set PYSPARK_SHELL=1 if [%IPYTHON%] == [1] ( ipython %IPYTHON_OPTS% ) else ( http://git-wip-us.apache.org/repos/asf/spark/blob/7fe08b43/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index 729..aa3743c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -149,14 +149,15 @@ private[spark] object SparkSubmitDriverBootstrapper { // subprocess there already reads directly from our stdin, so we should avoid spawning a // thread that contends with the subprocess in reading from System.in. val isWindows = Utils.isWindows -val isPySparkShell = sys.env.contains(PYSPARK_SHELL) +val isSubprocess = sys.env.contains(IS_SUBPROCESS) if (!isWindows) { val stdinThread = new RedirectThread(System.in, process.getOutputStream, redirect stdin) stdinThread.start() - // For the PySpark shell, Spark submit itself runs as a python subprocess, and so this JVM - // should terminate on broken pipe, which signals that the parent process has exited. In - // Windows, the termination logic for the PySpark shell is handled in java_gateway.py - if (isPySparkShell) { + // Spark submit (JVM) may run as a subprocess, and so this JVM should terminate on + // broken pipe, signaling that the parent process has exited. This is the case if the + // application is launched directly from python, as in the PySpark shell. In Windows, + // the termination logic is handled in java_gateway.py + if (isSubprocess) { stdinThread.join() process.destroy() } http://git-wip-us.apache.org/repos/asf/spark/blob/7fe08b43/python/pyspark/java_gateway.py -- diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 9c70fa5..a975dc1 100644 ---
spark git commit: [SPARK-4379][Core] Change Exception to SparkException in checkpoint
Repository: spark Updated Branches: refs/heads/master 7fe08b43c - dba140582 [SPARK-4379][Core] Change Exception to SparkException in checkpoint It's better to change to SparkException. However, it's a breaking change since it will change the exception type. Author: zsxwing zsxw...@gmail.com Closes #3241 from zsxwing/SPARK-4379 and squashes the following commits: 409f3af [zsxwing] Change Exception to SparkException in checkpoint Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dba14058 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dba14058 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dba14058 Branch: refs/heads/master Commit: dba14058230194122a715c219e35ab8eaa786321 Parents: 7fe08b4 Author: zsxwing zsxw...@gmail.com Authored: Fri Nov 14 22:25:41 2014 -0800 Committer: Reynold Xin r...@databricks.com Committed: Fri Nov 14 22:25:41 2014 -0800 -- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dba14058/core/src/main/scala/org/apache/spark/rdd/RDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 716f2dd..cb64d43 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1202,7 +1202,7 @@ abstract class RDD[T: ClassTag]( */ def checkpoint() { if (context.checkpointDir.isEmpty) { - throw new Exception(Checkpoint directory has not been set in the SparkContext) + throw new SparkException(Checkpoint directory has not been set in the SparkContext) } else if (checkpointData.isEmpty) { checkpointData = Some(new RDDCheckpointData(this)) checkpointData.get.markForCheckpoint() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4379][Core] Change Exception to SparkException in checkpoint
Repository: spark Updated Branches: refs/heads/branch-1.2 306e68cf0 - e27fa40ed [SPARK-4379][Core] Change Exception to SparkException in checkpoint It's better to change to SparkException. However, it's a breaking change since it will change the exception type. Author: zsxwing zsxw...@gmail.com Closes #3241 from zsxwing/SPARK-4379 and squashes the following commits: 409f3af [zsxwing] Change Exception to SparkException in checkpoint (cherry picked from commit dba14058230194122a715c219e35ab8eaa786321) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e27fa40e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e27fa40e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e27fa40e Branch: refs/heads/branch-1.2 Commit: e27fa40ed16c1b1d04911e0bdd803a4d43eb9a10 Parents: 306e68c Author: zsxwing zsxw...@gmail.com Authored: Fri Nov 14 22:25:41 2014 -0800 Committer: Reynold Xin r...@databricks.com Committed: Fri Nov 14 22:25:49 2014 -0800 -- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e27fa40e/core/src/main/scala/org/apache/spark/rdd/RDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 716f2dd..cb64d43 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1202,7 +1202,7 @@ abstract class RDD[T: ClassTag]( */ def checkpoint() { if (context.checkpointDir.isEmpty) { - throw new Exception(Checkpoint directory has not been set in the SparkContext) + throw new SparkException(Checkpoint directory has not been set in the SparkContext) } else if (checkpointData.isEmpty) { checkpointData = Some(new RDDCheckpointData(this)) checkpointData.get.markForCheckpoint() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4363][Doc] Update the Broadcast example
Repository: spark Updated Branches: refs/heads/branch-1.2 e27fa40ed - 29a6da372 [SPARK-4363][Doc] Update the Broadcast example Author: zsxwing zsxw...@gmail.com Closes #3226 from zsxwing/SPARK-4363 and squashes the following commits: 8109914 [zsxwing] Update the Broadcast example (cherry picked from commit 861223ee5bea8e434a9ebb0d53f436ce23809f9c) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29a6da37 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29a6da37 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29a6da37 Branch: refs/heads/branch-1.2 Commit: 29a6da37257d8a165967392af6f452a404e445cd Parents: e27fa40 Author: zsxwing zsxw...@gmail.com Authored: Fri Nov 14 22:28:48 2014 -0800 Committer: Reynold Xin r...@databricks.com Committed: Fri Nov 14 22:28:58 2014 -0800 -- core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala | 2 +- docs/programming-guide.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/29a6da37/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala -- diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala index 87f5cf9..a5ea478 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala @@ -39,7 +39,7 @@ import scala.reflect.ClassTag * * {{{ * scala val broadcastVar = sc.broadcast(Array(1, 2, 3)) - * broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c) + * broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) * * scala broadcastVar.value * res0: Array[Int] = Array(1, 2, 3) http://git-wip-us.apache.org/repos/asf/spark/blob/29a6da37/docs/programming-guide.md -- diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 18420af..9de2f91 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -1131,7 +1131,7 @@ method. The code below shows this: {% highlight scala %} scala val broadcastVar = sc.broadcast(Array(1, 2, 3)) -broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c) +broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala broadcastVar.value res0: Array[Int] = Array(1, 2, 3) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4363][Doc] Update the Broadcast example
Repository: spark Updated Branches: refs/heads/master dba140582 - 861223ee5 [SPARK-4363][Doc] Update the Broadcast example Author: zsxwing zsxw...@gmail.com Closes #3226 from zsxwing/SPARK-4363 and squashes the following commits: 8109914 [zsxwing] Update the Broadcast example Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/861223ee Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/861223ee Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/861223ee Branch: refs/heads/master Commit: 861223ee5bea8e434a9ebb0d53f436ce23809f9c Parents: dba1405 Author: zsxwing zsxw...@gmail.com Authored: Fri Nov 14 22:28:48 2014 -0800 Committer: Reynold Xin r...@databricks.com Committed: Fri Nov 14 22:28:48 2014 -0800 -- core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala | 2 +- docs/programming-guide.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/861223ee/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala -- diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala index 87f5cf9..a5ea478 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala @@ -39,7 +39,7 @@ import scala.reflect.ClassTag * * {{{ * scala val broadcastVar = sc.broadcast(Array(1, 2, 3)) - * broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c) + * broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) * * scala broadcastVar.value * res0: Array[Int] = Array(1, 2, 3) http://git-wip-us.apache.org/repos/asf/spark/blob/861223ee/docs/programming-guide.md -- diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 18420af..9de2f91 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -1131,7 +1131,7 @@ method. The code below shows this: {% highlight scala %} scala val broadcastVar = sc.broadcast(Array(1, 2, 3)) -broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c) +broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala broadcastVar.value res0: Array[Int] = Array(1, 2, 3) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4260] Httpbroadcast should set connection timeout.
Repository: spark Updated Branches: refs/heads/master 861223ee5 - 60969b033 [SPARK-4260] Httpbroadcast should set connection timeout. Httpbroadcast sets read timeout but doesn't set connection timeout. Author: Kousuke Saruta saru...@oss.nttdata.co.jp Closes #3122 from sarutak/httpbroadcast-timeout and squashes the following commits: c7f3a56 [Kousuke Saruta] Added Connection timeout for Http Connection to HttpBroadcast.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60969b03 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60969b03 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60969b03 Branch: refs/heads/master Commit: 60969b0336930449a826821a48f83f65337e8856 Parents: 861223e Author: Kousuke Saruta saru...@oss.nttdata.co.jp Authored: Fri Nov 14 22:36:56 2014 -0800 Committer: Reynold Xin r...@databricks.com Committed: Fri Nov 14 22:36:56 2014 -0800 -- core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/60969b03/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala -- diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 7dade04..31f0a46 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -191,10 +191,12 @@ private[broadcast] object HttpBroadcast extends Logging { logDebug(broadcast security enabled) val newuri = Utils.constructURIForAuthentication(new URI(url), securityManager) uc = newuri.toURL.openConnection() + uc.setConnectTimeout(httpReadTimeout) uc.setAllowUserInteraction(false) } else { logDebug(broadcast not using security) uc = new URL(url).openConnection() + uc.setConnectTimeout(httpReadTimeout) } val in = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Added contains(key) to Metadata
Repository: spark Updated Branches: refs/heads/master 60969b033 - cbddac236 Added contains(key) to Metadata Add contains(key) to org.apache.spark.sql.catalyst.util.Metadata to test the existence of a key. Otherwise, Class Metadata's get methods may throw NoSuchElement exception if the key does not exist. Testcases are added to MetadataSuite as well. Author: kai kaiz...@eecs.berkeley.edu Closes #3273 from kai-zeng/metadata-fix and squashes the following commits: 74b3d03 [kai] Added contains(key) to Metadata Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cbddac23 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cbddac23 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cbddac23 Branch: refs/heads/master Commit: cbddac23696d89b672dce380cc7360a873e27b3b Parents: 60969b0 Author: kai kaiz...@eecs.berkeley.edu Authored: Fri Nov 14 23:44:23 2014 -0800 Committer: Reynold Xin r...@databricks.com Committed: Fri Nov 14 23:44:23 2014 -0800 -- .../org/apache/spark/sql/catalyst/util/Metadata.scala | 3 +++ .../apache/spark/sql/catalyst/util/MetadataSuite.scala | 13 + 2 files changed, 16 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cbddac23/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala old mode 100644 new mode 100755 index 2f2082f..8172733 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala @@ -34,6 +34,9 @@ import org.json4s.jackson.JsonMethods._ */ sealed class Metadata private[util] (private[util] val map: Map[String, Any]) extends Serializable { + /** Tests whether this Metadata contains a binding for a key. */ + def contains(key: String): Boolean = map.contains(key) + /** Gets a Long. */ def getLong(key: String): Long = get(key) http://git-wip-us.apache.org/repos/asf/spark/blob/cbddac23/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala old mode 100644 new mode 100755 index 0063d31..f005b7d --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala @@ -56,17 +56,30 @@ class MetadataSuite extends FunSuite { .build() test(metadata builder and getters) { +assert(age.contains(summary) === false) +assert(age.contains(index) === true) assert(age.getLong(index) === 1L) +assert(age.contains(average) === true) assert(age.getDouble(average) === 45.0) +assert(age.contains(categorical) === true) assert(age.getBoolean(categorical) === false) +assert(age.contains(name) === true) assert(age.getString(name) === age) +assert(metadata.contains(purpose) === true) assert(metadata.getString(purpose) === ml) +assert(metadata.contains(isBase) === true) assert(metadata.getBoolean(isBase) === false) +assert(metadata.contains(summary) === true) assert(metadata.getMetadata(summary) === summary) +assert(metadata.contains(long[]) === true) assert(metadata.getLongArray(long[]).toSeq === Seq(0L, 1L)) +assert(metadata.contains(double[]) === true) assert(metadata.getDoubleArray(double[]).toSeq === Seq(3.0, 4.0)) +assert(metadata.contains(boolean[]) === true) assert(metadata.getBooleanArray(boolean[]).toSeq === Seq(true, false)) +assert(gender.contains(categories) === true) assert(gender.getStringArray(categories).toSeq === Seq(male, female)) +assert(metadata.contains(features) === true) assert(metadata.getMetadataArray(features).toSeq === Seq(age, gender)) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Added contains(key) to Metadata
Repository: spark Updated Branches: refs/heads/branch-1.2 37716b795 - c044e1241 Added contains(key) to Metadata Add contains(key) to org.apache.spark.sql.catalyst.util.Metadata to test the existence of a key. Otherwise, Class Metadata's get methods may throw NoSuchElement exception if the key does not exist. Testcases are added to MetadataSuite as well. Author: kai kaiz...@eecs.berkeley.edu Closes #3273 from kai-zeng/metadata-fix and squashes the following commits: 74b3d03 [kai] Added contains(key) to Metadata (cherry picked from commit cbddac23696d89b672dce380cc7360a873e27b3b) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c044e124 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c044e124 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c044e124 Branch: refs/heads/branch-1.2 Commit: c044e124115cc8e9ffb44d12c2744f33362f366f Parents: 37716b7 Author: kai kaiz...@eecs.berkeley.edu Authored: Fri Nov 14 23:44:23 2014 -0800 Committer: Reynold Xin r...@databricks.com Committed: Fri Nov 14 23:44:40 2014 -0800 -- .../org/apache/spark/sql/catalyst/util/Metadata.scala | 3 +++ .../apache/spark/sql/catalyst/util/MetadataSuite.scala | 13 + 2 files changed, 16 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c044e124/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala old mode 100644 new mode 100755 index 2f2082f..8172733 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala @@ -34,6 +34,9 @@ import org.json4s.jackson.JsonMethods._ */ sealed class Metadata private[util] (private[util] val map: Map[String, Any]) extends Serializable { + /** Tests whether this Metadata contains a binding for a key. */ + def contains(key: String): Boolean = map.contains(key) + /** Gets a Long. */ def getLong(key: String): Long = get(key) http://git-wip-us.apache.org/repos/asf/spark/blob/c044e124/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala old mode 100644 new mode 100755 index 0063d31..f005b7d --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala @@ -56,17 +56,30 @@ class MetadataSuite extends FunSuite { .build() test(metadata builder and getters) { +assert(age.contains(summary) === false) +assert(age.contains(index) === true) assert(age.getLong(index) === 1L) +assert(age.contains(average) === true) assert(age.getDouble(average) === 45.0) +assert(age.contains(categorical) === true) assert(age.getBoolean(categorical) === false) +assert(age.contains(name) === true) assert(age.getString(name) === age) +assert(metadata.contains(purpose) === true) assert(metadata.getString(purpose) === ml) +assert(metadata.contains(isBase) === true) assert(metadata.getBoolean(isBase) === false) +assert(metadata.contains(summary) === true) assert(metadata.getMetadata(summary) === summary) +assert(metadata.contains(long[]) === true) assert(metadata.getLongArray(long[]).toSeq === Seq(0L, 1L)) +assert(metadata.contains(double[]) === true) assert(metadata.getDoubleArray(double[]).toSeq === Seq(3.0, 4.0)) +assert(metadata.contains(boolean[]) === true) assert(metadata.getBooleanArray(boolean[]).toSeq === Seq(true, false)) +assert(gender.contains(categories) === true) assert(gender.getStringArray(categories).toSeq === Seq(male, female)) +assert(metadata.contains(features) === true) assert(metadata.getMetadataArray(features).toSeq === Seq(age, gender)) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-2321] Several progress API improvements / refactorings
Repository: spark Updated Branches: refs/heads/branch-1.2 c044e1241 - 9eac5fee6 [SPARK-2321] Several progress API improvements / refactorings This PR refactors / extends the status API introduced in #2696. - Change StatusAPI from a mixin trait to a class. Before, the new status API methods were directly accessible through SparkContext, whereas now they're accessed through a `sc.statusAPI` field. As long as we were going to add these methods directly to SparkContext, the mixin trait seemed like a good idea, but this might be simpler to reason about and may avoid pitfalls that I've run into while attempting to refactor other parts of SparkContext to use mixins (see #3071, for example). - Change the name from SparkStatusAPI to SparkStatusTracker. - Make `getJobIdsForGroup(null)` return ids for jobs that aren't associated with any job group. - Add `getActiveStageIds()` and `getActiveJobIds()` methods that return the ids of whatever's currently active in this SparkContext. This should simplify davies's progress bar code. Author: Josh Rosen joshro...@databricks.com Closes #3197 from JoshRosen/progress-api-improvements and squashes the following commits: 30b0afa [Josh Rosen] Rename SparkStatusAPI to SparkStatusTracker. d1b08d8 [Josh Rosen] Add missing newlines 2cc7353 [Josh Rosen] Add missing file. d5eab1f [Josh Rosen] Add getActive[Stage|Job]Ids() methods. a227984 [Josh Rosen] getJobIdsForGroup(null) should return jobs for default group c47e294 [Josh Rosen] Remove StatusAPI mixin trait. (cherry picked from commit 40eb8b6ef3a67e36d0d9492c044981a1da76351d) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9eac5fee Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9eac5fee Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9eac5fee Branch: refs/heads/branch-1.2 Commit: 9eac5fee64def9a18d8961069f631a176f339a5b Parents: c044e12 Author: Josh Rosen joshro...@databricks.com Authored: Fri Nov 14 23:46:25 2014 -0800 Committer: Reynold Xin r...@databricks.com Committed: Fri Nov 14 23:46:42 2014 -0800 -- .../scala/org/apache/spark/SparkContext.scala | 68 - .../scala/org/apache/spark/SparkStatusAPI.scala | 142 --- .../org/apache/spark/SparkStatusTracker.scala | 107 ++ .../spark/api/java/JavaSparkContext.scala | 21 +-- .../spark/api/java/JavaSparkStatusTracker.scala | 72 ++ .../scala/org/apache/spark/StatusAPISuite.scala | 78 -- .../org/apache/spark/StatusTrackerSuite.scala | 89 .../spark/examples/JavaStatusAPIDemo.java | 70 - .../spark/examples/JavaStatusTrackerDemo.java | 70 + 9 files changed, 407 insertions(+), 310 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9eac5fee/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 03ea672..65edeef 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -25,6 +25,7 @@ import java.util.{Arrays, Properties, UUID} import java.util.concurrent.atomic.AtomicInteger import java.util.UUID.randomUUID import scala.collection.{Map, Set} +import scala.collection.JavaConversions._ import scala.collection.generic.Growable import scala.collection.mutable.HashMap import scala.reflect.{ClassTag, classTag} @@ -61,7 +62,7 @@ import org.apache.spark.util._ * this config overrides the default configs as well as system properties. */ -class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { +class SparkContext(config: SparkConf) extends Logging { // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It @@ -228,6 +229,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { private[spark] val jobProgressListener = new JobProgressListener(conf) listenerBus.addListener(jobProgressListener) + val statusTracker = new SparkStatusTracker(this) + // Initialize the Spark UI private[spark] val ui: Option[SparkUI] = if (conf.getBoolean(spark.ui.enabled, true)) { @@ -1002,6 +1005,69 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { def version = SPARK_VERSION /** + * Return a map from the slave to the max memory available for caching and the remaining + * memory available for caching. + */ + def getExecutorMemoryStatus: Map[String, (Long, Long)] = { +
spark git commit: [SPARK-2321] Several progress API improvements / refactorings
Repository: spark Updated Branches: refs/heads/master cbddac236 - 40eb8b6ef [SPARK-2321] Several progress API improvements / refactorings This PR refactors / extends the status API introduced in #2696. - Change StatusAPI from a mixin trait to a class. Before, the new status API methods were directly accessible through SparkContext, whereas now they're accessed through a `sc.statusAPI` field. As long as we were going to add these methods directly to SparkContext, the mixin trait seemed like a good idea, but this might be simpler to reason about and may avoid pitfalls that I've run into while attempting to refactor other parts of SparkContext to use mixins (see #3071, for example). - Change the name from SparkStatusAPI to SparkStatusTracker. - Make `getJobIdsForGroup(null)` return ids for jobs that aren't associated with any job group. - Add `getActiveStageIds()` and `getActiveJobIds()` methods that return the ids of whatever's currently active in this SparkContext. This should simplify davies's progress bar code. Author: Josh Rosen joshro...@databricks.com Closes #3197 from JoshRosen/progress-api-improvements and squashes the following commits: 30b0afa [Josh Rosen] Rename SparkStatusAPI to SparkStatusTracker. d1b08d8 [Josh Rosen] Add missing newlines 2cc7353 [Josh Rosen] Add missing file. d5eab1f [Josh Rosen] Add getActive[Stage|Job]Ids() methods. a227984 [Josh Rosen] getJobIdsForGroup(null) should return jobs for default group c47e294 [Josh Rosen] Remove StatusAPI mixin trait. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40eb8b6e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40eb8b6e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40eb8b6e Branch: refs/heads/master Commit: 40eb8b6ef3a67e36d0d9492c044981a1da76351d Parents: cbddac2 Author: Josh Rosen joshro...@databricks.com Authored: Fri Nov 14 23:46:25 2014 -0800 Committer: Reynold Xin r...@databricks.com Committed: Fri Nov 14 23:46:25 2014 -0800 -- .../scala/org/apache/spark/SparkContext.scala | 68 - .../scala/org/apache/spark/SparkStatusAPI.scala | 142 --- .../org/apache/spark/SparkStatusTracker.scala | 107 ++ .../spark/api/java/JavaSparkContext.scala | 21 +-- .../spark/api/java/JavaSparkStatusTracker.scala | 72 ++ .../scala/org/apache/spark/StatusAPISuite.scala | 78 -- .../org/apache/spark/StatusTrackerSuite.scala | 89 .../spark/examples/JavaStatusAPIDemo.java | 70 - .../spark/examples/JavaStatusTrackerDemo.java | 70 + 9 files changed, 407 insertions(+), 310 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/40eb8b6e/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 03ea672..65edeef 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -25,6 +25,7 @@ import java.util.{Arrays, Properties, UUID} import java.util.concurrent.atomic.AtomicInteger import java.util.UUID.randomUUID import scala.collection.{Map, Set} +import scala.collection.JavaConversions._ import scala.collection.generic.Growable import scala.collection.mutable.HashMap import scala.reflect.{ClassTag, classTag} @@ -61,7 +62,7 @@ import org.apache.spark.util._ * this config overrides the default configs as well as system properties. */ -class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { +class SparkContext(config: SparkConf) extends Logging { // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It @@ -228,6 +229,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { private[spark] val jobProgressListener = new JobProgressListener(conf) listenerBus.addListener(jobProgressListener) + val statusTracker = new SparkStatusTracker(this) + // Initialize the Spark UI private[spark] val ui: Option[SparkUI] = if (conf.getBoolean(spark.ui.enabled, true)) { @@ -1002,6 +1005,69 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { def version = SPARK_VERSION /** + * Return a map from the slave to the max memory available for caching and the remaining + * memory available for caching. + */ + def getExecutorMemoryStatus: Map[String, (Long, Long)] = { +env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) = + (blockManagerId.host + : +