spark git commit: [SPARK-5563] [MLLIB] LDA with online variational inference
Repository: spark Updated Branches: refs/heads/master 9646018bb - 3539cb7d2 [SPARK-5563] [MLLIB] LDA with online variational inference JIRA: https://issues.apache.org/jira/browse/SPARK-5563 The PR contains the implementation for [Online LDA] (https://www.cs.princeton.edu/~blei/papers/HoffmanBleiBach2010b.pdf) based on the research of Matt Hoffman and David M. Blei, which provides an efficient option for LDA users. Major advantages for the algorithm are the stream compatibility and economic time/memory consumption due to the corpus split. For more details, please refer to the jira. Online LDA can act as a fast option for LDA, and will be especially helpful for the users who needs a quick result or with large corpus. Correctness test. I have tested current PR with https://github.com/Blei-Lab/onlineldavb and the results are identical. I've uploaded the result and code to https://github.com/hhbyyh/LDACrossValidation. Author: Yuhao Yang hhb...@gmail.com Author: Joseph K. Bradley jos...@databricks.com Closes #4419 from hhbyyh/ldaonline and squashes the following commits: 1045eec [Yuhao Yang] Merge pull request #2 from jkbradley/hhbyyh-ldaonline2 cf376ff [Joseph K. Bradley] For private vars needed for testing, I made them private and added accessors. Java doesnât understand package-private tags, so this minimizes the issues Java users might encounter. 6149ca6 [Yuhao Yang] fix for setOptimizer cf0007d [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into ldaonline 54cf8da [Yuhao Yang] some style change 68c2318 [Yuhao Yang] add a java ut 4041723 [Yuhao Yang] add ut 138bfed [Yuhao Yang] Merge pull request #1 from jkbradley/hhbyyh-ldaonline-update 9e910d9 [Joseph K. Bradley] small fix 61d60df [Joseph K. Bradley] Minor cleanups: * Update *Concentration parameter documentation * EM Optimizer: createVertices() does not need to be a function * OnlineLDAOptimizer: typos in doc * Clean up the core code for online LDA (Scala style) a996a82 [Yuhao Yang] respond to comments b1178cf [Yuhao Yang] fit into the optimizer framework dbe3cff [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into ldaonline 15be071 [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into ldaonline b29193b [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into ldaonline d19ef55 [Yuhao Yang] change OnlineLDA to class 97b9e1a [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into ldaonline e7bf3b0 [Yuhao Yang] move to seperate file f367cc9 [Yuhao Yang] change to optimization 8cb16a6 [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into ldaonline 62405cc [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into ldaonline 02d0373 [Yuhao Yang] fix style in comment f6d47ca [Yuhao Yang] Merge branch 'ldaonline' of https://github.com/hhbyyh/spark into ldaonline d86cdec [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into ldaonline a570c9a [Yuhao Yang] use sample to pick up batch 4a3f27e [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into ldaonline e271eb1 [Yuhao Yang] remove non ascii 581c623 [Yuhao Yang] seperate API and adjust batch split 37af91a [Yuhao Yang] iMerge remote-tracking branch 'upstream/master' into ldaonline 20328d1 [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into ldaonline i aa365d1 [Yuhao Yang] merge upstream master 3a06526 [Yuhao Yang] merge with new example 0dd3947 [Yuhao Yang] kMerge remote-tracking branch 'upstream/master' into ldaonline 0d0f3ee [Yuhao Yang] replace random split with sliding fa408a8 [Yuhao Yang] ssMerge remote-tracking branch 'upstream/master' into ldaonline 45884ab [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into ldaonline s f41c5ca [Yuhao Yang] style fix 26dca1b [Yuhao Yang] style fix and make class private 043e786 [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into ldaonline s Conflicts: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala d640d9c [Yuhao Yang] online lda initial checkin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3539cb7d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3539cb7d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3539cb7d Branch: refs/heads/master Commit: 3539cb7d20f5f878132407ec3b854011b183b2ad Parents: 9646018 Author: Yuhao Yang hhb...@gmail.com Authored: Mon May 4 00:06:25 2015 -0700 Committer: Joseph K. Bradley jos...@databricks.com Committed: Mon May 4 00:06:25 2015 -0700 -- .../org/apache/spark/mllib/clustering/LDA.scala | 65 ++-- .../spark/mllib/clustering/LDAOptimizer.scala | 320 +-- .../spark/mllib/clustering/JavaLDASuite.java| 38 ++- .../spark/mllib/clustering/LDASuite.scala | 89 +- 4 files changed, 438 insertions(+), 74 deletions(-)
spark git commit: [SPARK-5100] [SQL] add webui for thriftserver
Repository: spark Updated Branches: refs/heads/master 3539cb7d2 - 343d3bfaf [SPARK-5100] [SQL] add webui for thriftserver This PR is a rebased version of #3946 , and mainly focused on creating an independent tab for the thrift server in spark web UI. Features: 1. Session related statistics ( username and IP are only supported in hive-0.13.1 ) 2. List all the SQL executing or executed on this server 3. Provide links to the job generated by SQL 4. Provide link to show all SQL executing or executed in a specified session Prototype snapshots: This is the main page for thrift server ![image](https://cloud.githubusercontent.com/assets/1411869/7361379/df7dcc64-ed89-11e4-9964-4df0b32f475e.png) Author: tianyi tianyi.asiai...@gmail.com Closes #5730 from tianyi/SPARK-5100 and squashes the following commits: cfd14c7 [tianyi] style fix 0efe3d5 [tianyi] revert part of pom change c0f2fa0 [tianyi] extends HiveThriftJdbcTest to start/stop thriftserver for UI test aa20408 [tianyi] fix style problem c9df6f9 [tianyi] add testsuite for thriftserver ui and fix some style issue 9830199 [tianyi] add webui for thriftserver Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/343d3bfa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/343d3bfa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/343d3bfa Branch: refs/heads/master Commit: 343d3bfafd449a0371feb6a88f78e07302fa7143 Parents: 3539cb7 Author: tianyi tianyi.asiai...@gmail.com Authored: Mon May 4 16:59:34 2015 +0800 Committer: Cheng Lian l...@databricks.com Committed: Mon May 4 16:59:34 2015 +0800 -- .../scala/org/apache/spark/sql/SQLConf.scala| 2 + sql/hive-thriftserver/pom.xml | 12 ++ .../hive/thriftserver/HiveThriftServer2.scala | 161 ++- .../hive/thriftserver/ui/ThriftServerPage.scala | 190 ++ .../ui/ThriftServerSessionPage.scala| 197 +++ .../hive/thriftserver/ui/ThriftServerTab.scala | 50 + .../thriftserver/HiveThriftServer2Suites.scala | 12 +- .../sql/hive/thriftserver/UISeleniumSuite.scala | 105 ++ .../spark/sql/hive/thriftserver/Shim12.scala| 18 +- .../spark/sql/hive/thriftserver/Shim13.scala| 26 ++- 10 files changed, 751 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/343d3bfa/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 2fa602a..99db959 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -52,6 +52,8 @@ private[spark] object SQLConf { // This is only used for the thriftserver val THRIFTSERVER_POOL = spark.sql.thriftserver.scheduler.pool + val THRIFTSERVER_UI_STATEMENT_LIMIT = spark.sql.thriftserver.ui.retainedStatements + val THRIFTSERVER_UI_SESSION_LIMIT = spark.sql.thriftserver.ui.retainedSessions // This is used to set the default data source val DEFAULT_DATA_SOURCE_NAME = spark.sql.sources.default http://git-wip-us.apache.org/repos/asf/spark/blob/343d3bfa/sql/hive-thriftserver/pom.xml -- diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml index f38c796..437f697 100644 --- a/sql/hive-thriftserver/pom.xml +++ b/sql/hive-thriftserver/pom.xml @@ -57,6 +57,18 @@ groupId${hive.group}/groupId artifactIdhive-beeline/artifactId /dependency +!-- Added for selenium: -- +dependency + groupIdorg.seleniumhq.selenium/groupId + artifactIdselenium-java/artifactId + scopetest/scope + exclusions +exclusion + groupIdio.netty/groupId + artifactIdnetty/artifactId +/exclusion + /exclusions +/dependency /dependencies build outputDirectorytarget/scala-${scala.binary.version}/classes/outputDirectory http://git-wip-us.apache.org/repos/asf/spark/blob/343d3bfa/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 832596f..0be5a92 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -22,20
Git Push Summary
Repository: spark Updated Tags: refs/tags/branch-1.4 [created] 343d3bfaf - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
Git Push Summary
Repository: spark Updated Tags: refs/tags/branch-1.4 [deleted] 343d3bfaf - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.2.2-rc1 [deleted] 7531b50e4 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
Git Push Summary
Repository: spark Updated Branches: refs/heads/branch-1.4 [created] 343d3bfaf - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
Git Push Summary
Repository: spark Updated Tags: refs/tags/branch-1.4 [created] 343d3bfaf - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
Git Push Summary
Repository: spark Updated Tags: refs/tags/branch-1.4 [deleted] 343d3bfaf - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR] Fix python test typo?
Repository: spark Updated Branches: refs/heads/master 343d3bfaf - 5a1a1075a [MINOR] Fix python test typo? I suspect haven't been using anaconda in tests in a while. I wonder if this change actually does anything but this line as it stands looks strictly less correct. Author: Andrew Or and...@databricks.com Closes #5883 from andrewor14/fix-run-tests-typo and squashes the following commits: a3ad720 [Andrew Or] Fix typo? Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a1a1075 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a1a1075 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a1a1075 Branch: refs/heads/master Commit: 5a1a1075a607be683f008ef92fa227803370c45f Parents: 343d3bf Author: Andrew Or and...@databricks.com Authored: Mon May 4 17:17:55 2015 +0100 Committer: Patrick Wendell patr...@databricks.com Committed: Mon May 4 17:17:55 2015 +0100 -- dev/run-tests | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5a1a1075/dev/run-tests -- diff --git a/dev/run-tests b/dev/run-tests index 861d167..05c63bc 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -236,7 +236,7 @@ echo = CURRENT_BLOCK=$BLOCK_PYSPARK_UNIT_TESTS # add path for python 3 in jenkins -export PATH=${PATH}:/home/anaonda/envs/py3k/bin +export PATH=${PATH}:/home/anaconda/envs/py3k/bin ./python/run-tests echo - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[3/3] spark git commit: [SPARK-5956] [MLLIB] Pipeline components should be copyable.
[SPARK-5956] [MLLIB] Pipeline components should be copyable. This PR added `copy(extra: ParamMap): Params` to `Params`, which makes a copy of the current instance with a randomly generated uid and some extra param values. With this change, we only need to implement `fit` and `transform` without extra param values given the default implementation of `fit(dataset, extra)`: ~~~scala def fit(dataset: DataFrame, extra: ParamMap): Model = { copy(extra).fit(dataset) } ~~~ Inside `fit` and `transform`, since only the embedded values are used, I added `$` as an alias for `getOrDefault` to make the code easier to read. For example, in `LinearRegression.fit` we have: ~~~scala val effectiveRegParam = $(regParam) / yStd val effectiveL1RegParam = $(elasticNetParam) * effectiveRegParam val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam ~~~ Meta-algorithm like `Pipeline` implements its own `copy(extra)`. So the fitted pipeline model stored all copied stages (no matter whether it is a transformer or a model). Other changes: * `Params$.inheritValues` is moved to `Params!.copyValues` and returns the target instance. * `fittingParamMap` was removed because the `parent` carries this information. * `validate` was renamed to `validateParams` to be more precise. TODOs: * [x] add tests for newly added methods * [ ] update documentation jkbradley dbtsai Author: Xiangrui Meng m...@databricks.com Closes #5820 from mengxr/SPARK-5956 and squashes the following commits: 7bef88d [Xiangrui Meng] address comments 05229c3 [Xiangrui Meng] assert - assertEquals b2927b1 [Xiangrui Meng] organize imports f14456b [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-5956 93e7924 [Xiangrui Meng] add tests for hasParam copy 463ecae [Xiangrui Meng] merge master 2b954c3 [Xiangrui Meng] update Binarizer 465dd12 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-5956 282a1a8 [Xiangrui Meng] fix test 819dd2d [Xiangrui Meng] merge master b642872 [Xiangrui Meng] example code runs 5a67779 [Xiangrui Meng] examples compile c76b4d1 [Xiangrui Meng] fix all unit tests 0f4fd64 [Xiangrui Meng] fix some tests 9286a22 [Xiangrui Meng] copyValues to trained models 53e0973 [Xiangrui Meng] move inheritValues to Params and rename it to copyValues 9ee004e [Xiangrui Meng] merge copy and copyWith; rename validate to validateParams d882afc [Xiangrui Meng] test compile f082a31 [Xiangrui Meng] make Params copyable and simply handling of extra params in all spark.ml components Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0833c59 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0833c59 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0833c59 Branch: refs/heads/master Commit: e0833c5958bbd73ff27cfe6865648d7b6e5a99bc Parents: 5a1a107 Author: Xiangrui Meng m...@databricks.com Authored: Mon May 4 11:28:59 2015 -0700 Committer: Xiangrui Meng m...@databricks.com Committed: Mon May 4 11:28:59 2015 -0700 -- .../examples/ml/JavaDeveloperApiExample.java| 24 ++--- .../examples/ml/JavaSimpleParamsExample.java| 4 +- .../spark/examples/ml/DecisionTreeExample.scala | 6 +- .../spark/examples/ml/DeveloperApiExample.scala | 22 ++-- .../apache/spark/examples/ml/GBTExample.scala | 4 +- .../spark/examples/ml/RandomForestExample.scala | 6 +- .../spark/examples/ml/SimpleParamsExample.scala | 4 +- .../scala/org/apache/spark/ml/Estimator.scala | 26 +++-- .../scala/org/apache/spark/ml/Evaluator.scala | 20 +++- .../main/scala/org/apache/spark/ml/Model.scala | 9 +- .../scala/org/apache/spark/ml/Pipeline.scala| 106 +-- .../scala/org/apache/spark/ml/Transformer.scala | 46 +--- .../spark/ml/classification/Classifier.scala| 49 +++-- .../classification/DecisionTreeClassifier.scala | 29 ++--- .../spark/ml/classification/GBTClassifier.scala | 33 +++--- .../ml/classification/LogisticRegression.scala | 58 +- .../ProbabilisticClassifier.scala | 33 ++ .../classification/RandomForestClassifier.scala | 31 +++--- .../BinaryClassificationEvaluator.scala | 17 ++- .../org/apache/spark/ml/feature/Binarizer.scala | 20 ++-- .../org/apache/spark/ml/feature/HashingTF.scala | 10 +- .../scala/org/apache/spark/ml/feature/IDF.scala | 38 +++ .../apache/spark/ml/feature/Normalizer.scala| 10 +- .../spark/ml/feature/PolynomialExpansion.scala | 9 +- .../spark/ml/feature/StandardScaler.scala | 49 - .../apache/spark/ml/feature/StringIndexer.scala | 34 +++--- .../org/apache/spark/ml/feature/Tokenizer.scala | 18 ++-- .../spark/ml/feature/VectorAssembler.scala | 15 ++- .../apache/spark/ml/feature/VectorIndexer.scala | 74 ++--- .../org/apache/spark/ml/feature/Word2Vec.scala | 62 +--
[2/3] spark git commit: [SPARK-5956] [MLLIB] Pipeline components should be copyable.
http://git-wip-us.apache.org/repos/asf/spark/blob/893b3103/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala index 9db3b29..3d78537 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala @@ -34,18 +34,17 @@ import org.apache.spark.util.collection.OpenHashMap private[feature] trait StringIndexerBase extends Params with HasInputCol with HasOutputCol { /** Validates and transforms the input schema. */ - protected def validateAndTransformSchema(schema: StructType, paramMap: ParamMap): StructType = { -val map = extractParamMap(paramMap) -val inputColName = map(inputCol) + protected def validateAndTransformSchema(schema: StructType): StructType = { +val inputColName = $(inputCol) val inputDataType = schema(inputColName).dataType require(inputDataType == StringType || inputDataType.isInstanceOf[NumericType], sThe input column $inputColName must be either string type or numeric type, + sbut got $inputDataType.) val inputFields = schema.fields -val outputColName = map(outputCol) +val outputColName = $(outputCol) require(inputFields.forall(_.name != outputColName), sOutput column $outputColName already exists.) -val attr = NominalAttribute.defaultAttr.withName(map(outputCol)) +val attr = NominalAttribute.defaultAttr.withName($(outputCol)) val outputFields = inputFields :+ attr.toStructField() StructType(outputFields) } @@ -69,19 +68,16 @@ class StringIndexer extends Estimator[StringIndexerModel] with StringIndexerBase // TODO: handle unseen labels - override def fit(dataset: DataFrame, paramMap: ParamMap): StringIndexerModel = { -val map = extractParamMap(paramMap) -val counts = dataset.select(col(map(inputCol)).cast(StringType)) + override def fit(dataset: DataFrame): StringIndexerModel = { +val counts = dataset.select(col($(inputCol)).cast(StringType)) .map(_.getString(0)) .countByValue() val labels = counts.toSeq.sortBy(-_._2).map(_._1).toArray -val model = new StringIndexerModel(this, map, labels) -Params.inheritValues(map, this, model) -model +copyValues(new StringIndexerModel(this, labels)) } - override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { -validateAndTransformSchema(schema, paramMap) + override def transformSchema(schema: StructType): StructType = { +validateAndTransformSchema(schema) } } @@ -92,7 +88,6 @@ class StringIndexer extends Estimator[StringIndexerModel] with StringIndexerBase @AlphaComponent class StringIndexerModel private[ml] ( override val parent: StringIndexer, -override val fittingParamMap: ParamMap, labels: Array[String]) extends Model[StringIndexerModel] with StringIndexerBase { private val labelToIndex: OpenHashMap[String, Double] = { @@ -112,8 +107,7 @@ class StringIndexerModel private[ml] ( /** @group setParam */ def setOutputCol(value: String): this.type = set(outputCol, value) - override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { -val map = extractParamMap(paramMap) + override def transform(dataset: DataFrame): DataFrame = { val indexer = udf { label: String = if (labelToIndex.contains(label)) { labelToIndex(label) @@ -122,14 +116,14 @@ class StringIndexerModel private[ml] ( throw new SparkException(sUnseen label: $label.) } } -val outputColName = map(outputCol) +val outputColName = $(outputCol) val metadata = NominalAttribute.defaultAttr .withName(outputColName).withValues(labels).toMetadata() dataset.select(col(*), - indexer(dataset(map(inputCol)).cast(StringType)).as(outputColName, metadata)) + indexer(dataset($(inputCol)).cast(StringType)).as(outputColName, metadata)) } - override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { -validateAndTransformSchema(schema, paramMap) + override def transformSchema(schema: StructType): StructType = { +validateAndTransformSchema(schema) } } http://git-wip-us.apache.org/repos/asf/spark/blob/893b3103/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala index 01752ba..2863b76 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala @@ -20,7 +20,7 @@ package org.apache.spark.ml.feature import
[3/3] spark git commit: [SPARK-5956] [MLLIB] Pipeline components should be copyable.
[SPARK-5956] [MLLIB] Pipeline components should be copyable. This PR added `copy(extra: ParamMap): Params` to `Params`, which makes a copy of the current instance with a randomly generated uid and some extra param values. With this change, we only need to implement `fit` and `transform` without extra param values given the default implementation of `fit(dataset, extra)`: ~~~scala def fit(dataset: DataFrame, extra: ParamMap): Model = { copy(extra).fit(dataset) } ~~~ Inside `fit` and `transform`, since only the embedded values are used, I added `$` as an alias for `getOrDefault` to make the code easier to read. For example, in `LinearRegression.fit` we have: ~~~scala val effectiveRegParam = $(regParam) / yStd val effectiveL1RegParam = $(elasticNetParam) * effectiveRegParam val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam ~~~ Meta-algorithm like `Pipeline` implements its own `copy(extra)`. So the fitted pipeline model stored all copied stages (no matter whether it is a transformer or a model). Other changes: * `Params$.inheritValues` is moved to `Params!.copyValues` and returns the target instance. * `fittingParamMap` was removed because the `parent` carries this information. * `validate` was renamed to `validateParams` to be more precise. TODOs: * [x] add tests for newly added methods * [ ] update documentation jkbradley dbtsai Author: Xiangrui Meng m...@databricks.com Closes #5820 from mengxr/SPARK-5956 and squashes the following commits: 7bef88d [Xiangrui Meng] address comments 05229c3 [Xiangrui Meng] assert - assertEquals b2927b1 [Xiangrui Meng] organize imports f14456b [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-5956 93e7924 [Xiangrui Meng] add tests for hasParam copy 463ecae [Xiangrui Meng] merge master 2b954c3 [Xiangrui Meng] update Binarizer 465dd12 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-5956 282a1a8 [Xiangrui Meng] fix test 819dd2d [Xiangrui Meng] merge master b642872 [Xiangrui Meng] example code runs 5a67779 [Xiangrui Meng] examples compile c76b4d1 [Xiangrui Meng] fix all unit tests 0f4fd64 [Xiangrui Meng] fix some tests 9286a22 [Xiangrui Meng] copyValues to trained models 53e0973 [Xiangrui Meng] move inheritValues to Params and rename it to copyValues 9ee004e [Xiangrui Meng] merge copy and copyWith; rename validate to validateParams d882afc [Xiangrui Meng] test compile f082a31 [Xiangrui Meng] make Params copyable and simply handling of extra params in all spark.ml components (cherry picked from commit e0833c5958bbd73ff27cfe6865648d7b6e5a99bc) Signed-off-by: Xiangrui Meng m...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/893b3103 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/893b3103 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/893b3103 Branch: refs/heads/branch-1.4 Commit: 893b3103fef81b5e47ece599c3989c742e12f07c Parents: 343d3bf Author: Xiangrui Meng m...@databricks.com Authored: Mon May 4 11:28:59 2015 -0700 Committer: Xiangrui Meng m...@databricks.com Committed: Mon May 4 11:29:13 2015 -0700 -- .../examples/ml/JavaDeveloperApiExample.java| 24 ++--- .../examples/ml/JavaSimpleParamsExample.java| 4 +- .../spark/examples/ml/DecisionTreeExample.scala | 6 +- .../spark/examples/ml/DeveloperApiExample.scala | 22 ++-- .../apache/spark/examples/ml/GBTExample.scala | 4 +- .../spark/examples/ml/RandomForestExample.scala | 6 +- .../spark/examples/ml/SimpleParamsExample.scala | 4 +- .../scala/org/apache/spark/ml/Estimator.scala | 26 +++-- .../scala/org/apache/spark/ml/Evaluator.scala | 20 +++- .../main/scala/org/apache/spark/ml/Model.scala | 9 +- .../scala/org/apache/spark/ml/Pipeline.scala| 106 +-- .../scala/org/apache/spark/ml/Transformer.scala | 46 +--- .../spark/ml/classification/Classifier.scala| 49 +++-- .../classification/DecisionTreeClassifier.scala | 29 ++--- .../spark/ml/classification/GBTClassifier.scala | 33 +++--- .../ml/classification/LogisticRegression.scala | 58 +- .../ProbabilisticClassifier.scala | 33 ++ .../classification/RandomForestClassifier.scala | 31 +++--- .../BinaryClassificationEvaluator.scala | 17 ++- .../org/apache/spark/ml/feature/Binarizer.scala | 20 ++-- .../org/apache/spark/ml/feature/HashingTF.scala | 10 +- .../scala/org/apache/spark/ml/feature/IDF.scala | 38 +++ .../apache/spark/ml/feature/Normalizer.scala| 10 +- .../spark/ml/feature/PolynomialExpansion.scala | 9 +- .../spark/ml/feature/StandardScaler.scala | 49 - .../apache/spark/ml/feature/StringIndexer.scala | 34 +++--- .../org/apache/spark/ml/feature/Tokenizer.scala | 18 ++-- .../spark/ml/feature/VectorAssembler.scala | 15 ++-
[2/3] spark git commit: [SPARK-5956] [MLLIB] Pipeline components should be copyable.
http://git-wip-us.apache.org/repos/asf/spark/blob/e0833c59/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala index 9db3b29..3d78537 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala @@ -34,18 +34,17 @@ import org.apache.spark.util.collection.OpenHashMap private[feature] trait StringIndexerBase extends Params with HasInputCol with HasOutputCol { /** Validates and transforms the input schema. */ - protected def validateAndTransformSchema(schema: StructType, paramMap: ParamMap): StructType = { -val map = extractParamMap(paramMap) -val inputColName = map(inputCol) + protected def validateAndTransformSchema(schema: StructType): StructType = { +val inputColName = $(inputCol) val inputDataType = schema(inputColName).dataType require(inputDataType == StringType || inputDataType.isInstanceOf[NumericType], sThe input column $inputColName must be either string type or numeric type, + sbut got $inputDataType.) val inputFields = schema.fields -val outputColName = map(outputCol) +val outputColName = $(outputCol) require(inputFields.forall(_.name != outputColName), sOutput column $outputColName already exists.) -val attr = NominalAttribute.defaultAttr.withName(map(outputCol)) +val attr = NominalAttribute.defaultAttr.withName($(outputCol)) val outputFields = inputFields :+ attr.toStructField() StructType(outputFields) } @@ -69,19 +68,16 @@ class StringIndexer extends Estimator[StringIndexerModel] with StringIndexerBase // TODO: handle unseen labels - override def fit(dataset: DataFrame, paramMap: ParamMap): StringIndexerModel = { -val map = extractParamMap(paramMap) -val counts = dataset.select(col(map(inputCol)).cast(StringType)) + override def fit(dataset: DataFrame): StringIndexerModel = { +val counts = dataset.select(col($(inputCol)).cast(StringType)) .map(_.getString(0)) .countByValue() val labels = counts.toSeq.sortBy(-_._2).map(_._1).toArray -val model = new StringIndexerModel(this, map, labels) -Params.inheritValues(map, this, model) -model +copyValues(new StringIndexerModel(this, labels)) } - override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { -validateAndTransformSchema(schema, paramMap) + override def transformSchema(schema: StructType): StructType = { +validateAndTransformSchema(schema) } } @@ -92,7 +88,6 @@ class StringIndexer extends Estimator[StringIndexerModel] with StringIndexerBase @AlphaComponent class StringIndexerModel private[ml] ( override val parent: StringIndexer, -override val fittingParamMap: ParamMap, labels: Array[String]) extends Model[StringIndexerModel] with StringIndexerBase { private val labelToIndex: OpenHashMap[String, Double] = { @@ -112,8 +107,7 @@ class StringIndexerModel private[ml] ( /** @group setParam */ def setOutputCol(value: String): this.type = set(outputCol, value) - override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { -val map = extractParamMap(paramMap) + override def transform(dataset: DataFrame): DataFrame = { val indexer = udf { label: String = if (labelToIndex.contains(label)) { labelToIndex(label) @@ -122,14 +116,14 @@ class StringIndexerModel private[ml] ( throw new SparkException(sUnseen label: $label.) } } -val outputColName = map(outputCol) +val outputColName = $(outputCol) val metadata = NominalAttribute.defaultAttr .withName(outputColName).withValues(labels).toMetadata() dataset.select(col(*), - indexer(dataset(map(inputCol)).cast(StringType)).as(outputColName, metadata)) + indexer(dataset($(inputCol)).cast(StringType)).as(outputColName, metadata)) } - override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { -validateAndTransformSchema(schema, paramMap) + override def transformSchema(schema: StructType): StructType = { +validateAndTransformSchema(schema) } } http://git-wip-us.apache.org/repos/asf/spark/blob/e0833c59/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala index 01752ba..2863b76 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala @@ -20,7 +20,7 @@ package org.apache.spark.ml.feature import
[1/3] spark git commit: [SPARK-5956] [MLLIB] Pipeline components should be copyable.
Repository: spark Updated Branches: refs/heads/master 5a1a1075a - e0833c595 http://git-wip-us.apache.org/repos/asf/spark/blob/e0833c59/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java -- diff --git a/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java b/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java index 0cc36c8..a82b86d 100644 --- a/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java @@ -23,14 +23,15 @@ import java.util.List; import org.junit.After; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import static org.apache.spark.mllib.classification.LogisticRegressionSuite -.generateLogisticInputAsList; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; +import static org.apache.spark.mllib.classification.LogisticRegressionSuite + .generateLogisticInputAsList; public class JavaLinearRegressionSuite implements Serializable { @@ -65,8 +66,8 @@ public class JavaLinearRegressionSuite implements Serializable { DataFrame predictions = jsql.sql(SELECT label, prediction FROM prediction); predictions.collect(); // Check defaults -assert(model.getFeaturesCol().equals(features)); -assert(model.getPredictionCol().equals(prediction)); +assertEquals(features, model.getFeaturesCol()); +assertEquals(prediction, model.getPredictionCol()); } @Test @@ -76,14 +77,16 @@ public class JavaLinearRegressionSuite implements Serializable { .setMaxIter(10) .setRegParam(1.0); LinearRegressionModel model = lr.fit(dataset); -assert(model.fittingParamMap().apply(lr.maxIter()).equals(10)); -assert(model.fittingParamMap().apply(lr.regParam()).equals(1.0)); +LinearRegression parent = model.parent(); +assertEquals(10, parent.getMaxIter()); +assertEquals(1.0, parent.getRegParam(), 0.0); // Call fit() with new params, and check as many params as we can. LinearRegressionModel model2 = lr.fit(dataset, lr.maxIter().w(5), lr.regParam().w(0.1), lr.predictionCol().w(thePred)); -assert(model2.fittingParamMap().apply(lr.maxIter()).equals(5)); -assert(model2.fittingParamMap().apply(lr.regParam()).equals(0.1)); -assert(model2.getPredictionCol().equals(thePred)); +LinearRegression parent2 = model2.parent(); +assertEquals(5, parent2.getMaxIter()); +assertEquals(0.1, parent2.getRegParam(), 0.0); +assertEquals(thePred, model2.getPredictionCol()); } } http://git-wip-us.apache.org/repos/asf/spark/blob/e0833c59/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java -- diff --git a/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java b/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java index 0bb6b48..08eeca5 100644 --- a/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java +++ b/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java @@ -68,8 +68,8 @@ public class JavaCrossValidatorSuite implements Serializable { .setEvaluator(eval) .setNumFolds(3); CrossValidatorModel cvModel = cv.fit(dataset); -ParamMap bestParamMap = cvModel.bestModel().fittingParamMap(); -Assert.assertEquals(0.001, bestParamMap.apply(lr.regParam())); -Assert.assertEquals(10, bestParamMap.apply(lr.maxIter())); +LogisticRegression parent = (LogisticRegression) cvModel.bestModel().parent(); +Assert.assertEquals(0.001, parent.getRegParam(), 0.0); +Assert.assertEquals(10, parent.getMaxIter()); } } http://git-wip-us.apache.org/repos/asf/spark/blob/e0833c59/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala index 2f175fb..2b04a30 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala @@ -42,30 +42,32 @@ class PipelineSuite extends FunSuite { val dataset3 = mock[DataFrame] val dataset4 = mock[DataFrame] -when(estimator0.fit(meq(dataset0), any[ParamMap]())).thenReturn(model0) -when(model0.transform(meq(dataset0), any[ParamMap]())).thenReturn(dataset1) +when(estimator0.copy(any[ParamMap])).thenReturn(estimator0) +when(model0.copy(any[ParamMap])).thenReturn(model0) +
[1/3] spark git commit: [SPARK-5956] [MLLIB] Pipeline components should be copyable.
Repository: spark Updated Branches: refs/heads/branch-1.4 343d3bfaf - 893b3103f http://git-wip-us.apache.org/repos/asf/spark/blob/893b3103/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java -- diff --git a/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java b/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java index 0cc36c8..a82b86d 100644 --- a/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java @@ -23,14 +23,15 @@ import java.util.List; import org.junit.After; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import static org.apache.spark.mllib.classification.LogisticRegressionSuite -.generateLogisticInputAsList; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; +import static org.apache.spark.mllib.classification.LogisticRegressionSuite + .generateLogisticInputAsList; public class JavaLinearRegressionSuite implements Serializable { @@ -65,8 +66,8 @@ public class JavaLinearRegressionSuite implements Serializable { DataFrame predictions = jsql.sql(SELECT label, prediction FROM prediction); predictions.collect(); // Check defaults -assert(model.getFeaturesCol().equals(features)); -assert(model.getPredictionCol().equals(prediction)); +assertEquals(features, model.getFeaturesCol()); +assertEquals(prediction, model.getPredictionCol()); } @Test @@ -76,14 +77,16 @@ public class JavaLinearRegressionSuite implements Serializable { .setMaxIter(10) .setRegParam(1.0); LinearRegressionModel model = lr.fit(dataset); -assert(model.fittingParamMap().apply(lr.maxIter()).equals(10)); -assert(model.fittingParamMap().apply(lr.regParam()).equals(1.0)); +LinearRegression parent = model.parent(); +assertEquals(10, parent.getMaxIter()); +assertEquals(1.0, parent.getRegParam(), 0.0); // Call fit() with new params, and check as many params as we can. LinearRegressionModel model2 = lr.fit(dataset, lr.maxIter().w(5), lr.regParam().w(0.1), lr.predictionCol().w(thePred)); -assert(model2.fittingParamMap().apply(lr.maxIter()).equals(5)); -assert(model2.fittingParamMap().apply(lr.regParam()).equals(0.1)); -assert(model2.getPredictionCol().equals(thePred)); +LinearRegression parent2 = model2.parent(); +assertEquals(5, parent2.getMaxIter()); +assertEquals(0.1, parent2.getRegParam(), 0.0); +assertEquals(thePred, model2.getPredictionCol()); } } http://git-wip-us.apache.org/repos/asf/spark/blob/893b3103/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java -- diff --git a/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java b/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java index 0bb6b48..08eeca5 100644 --- a/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java +++ b/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java @@ -68,8 +68,8 @@ public class JavaCrossValidatorSuite implements Serializable { .setEvaluator(eval) .setNumFolds(3); CrossValidatorModel cvModel = cv.fit(dataset); -ParamMap bestParamMap = cvModel.bestModel().fittingParamMap(); -Assert.assertEquals(0.001, bestParamMap.apply(lr.regParam())); -Assert.assertEquals(10, bestParamMap.apply(lr.maxIter())); +LogisticRegression parent = (LogisticRegression) cvModel.bestModel().parent(); +Assert.assertEquals(0.001, parent.getRegParam(), 0.0); +Assert.assertEquals(10, parent.getMaxIter()); } } http://git-wip-us.apache.org/repos/asf/spark/blob/893b3103/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala index 2f175fb..2b04a30 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala @@ -42,30 +42,32 @@ class PipelineSuite extends FunSuite { val dataset3 = mock[DataFrame] val dataset4 = mock[DataFrame] -when(estimator0.fit(meq(dataset0), any[ParamMap]())).thenReturn(model0) -when(model0.transform(meq(dataset0), any[ParamMap]())).thenReturn(dataset1) +when(estimator0.copy(any[ParamMap])).thenReturn(estimator0) +when(model0.copy(any[ParamMap])).thenReturn(model0) +
spark git commit: [SPARK-7319][SQL] Improve the output from DataFrame.show()
Repository: spark Updated Branches: refs/heads/master e0833c595 - f32e69ecc [SPARK-7319][SQL] Improve the output from DataFrame.show() Author: äºå³¤ chensong...@alibaba-inc.com Closes #5865 from kaka1992/df.show and squashes the following commits: c79204b [äºå³¤] Update a1338f6 [äºå³¤] Update python dataFrame show test and add empty df unit test. 734369c [äºå³¤] Update python dataFrame show test and add empty df unit test. 84aec3e [äºå³¤] Update python dataFrame show test and add empty df unit test. 159b3d5 [äºå³¤] update 03ef434 [äºå³¤] update 7394fd5 [äºå³¤] update test show ced487a [äºå³¤] update pep8 b6e690b [äºå³¤] Merge remote-tracking branch 'upstream/master' into df.show 30ac311 [äºå³¤] [SPARK-7294] ADD BETWEEN 7d62368 [äºå³¤] [SPARK-7294] ADD BETWEEN baf839b [äºå³¤] [SPARK-7294] ADD BETWEEN d11d5b9 [äºå³¤] [SPARK-7294] ADD BETWEEN Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f32e69ec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f32e69ec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f32e69ec Branch: refs/heads/master Commit: f32e69ecc333867fc966f65cd0aeaeddd43e0945 Parents: e0833c5 Author: äºå³¤ chensong...@alibaba-inc.com Authored: Mon May 4 12:08:38 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon May 4 12:08:38 2015 -0700 -- R/pkg/R/DataFrame.R | 2 +- R/pkg/inst/tests/test_sparkSQL.R| 2 +- python/pyspark/sql/dataframe.py | 105 --- .../scala/org/apache/spark/sql/DataFrame.scala | 28 +++-- .../org/apache/spark/sql/DataFrameSuite.scala | 19 5 files changed, 112 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f32e69ec/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index b59b700..841e77e 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -167,7 +167,7 @@ setMethod(isLocal, setMethod(showDF, signature(x = DataFrame), function(x, numRows = 20) { -cat(callJMethod(x@sdf, showString, numToInt(numRows)), \n) +callJMethod(x@sdf, showString, numToInt(numRows)) }) #' show http://git-wip-us.apache.org/repos/asf/spark/blob/f32e69ec/R/pkg/inst/tests/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index af7a6c5..f82e56f 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -641,7 +641,7 @@ test_that(toJSON() returns an RDD of the correct values, { test_that(showDF(), { df - jsonFile(sqlCtx, jsonPath) - expect_output(showDF(df), age name \nnull Michael\n30 Andy \n19 Justin ) + expect_output(showDF(df), ++---+\n| age| name|\n++---+\n|null|Michael|\n| 30| Andy|\n| 19| Justin|\n++---+\n) }) test_that(isLocal(), { http://git-wip-us.apache.org/repos/asf/spark/blob/f32e69ec/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index aac5b8c..22762c5 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -275,9 +275,12 @@ class DataFrame(object): df DataFrame[age: int, name: string] df.show() -age name -2 Alice -5 Bob ++---+-+ +|age| name| ++---+-+ +| 2|Alice| +| 5| Bob| ++---+-+ print(self._jdf.showString(n)) @@ -591,12 +594,15 @@ class DataFrame(object): given, this function computes statistics for all numerical columns. df.describe().show() -summary age -count 2 -mean3.5 -stddev 1.5 -min 2 -max 5 ++---+---+ +|summary|age| ++---+---+ +| count| 2| +| mean|3.5| +| stddev|1.5| +|min| 2| +|max| 5| ++---+---+ jdf = self._jdf.describe(self._jseq(cols)) return DataFrame(jdf, self.sql_ctx) @@ -801,12 +807,18 @@ class DataFrame(object): :param subset: optional list of column names to consider. df4.dropna().show() -age height name -10 80 Alice ++---+--+-+ +|age|height| name| ++---+--+-+ +| 10|80|Alice| ++---+--+-+ df4.na.drop().show() -age height name -10 80 Alice ++---+--+-+ +
spark git commit: [SPARK-7266] Add ExpectsInputTypes to expressions when possible.
Repository: spark Updated Branches: refs/heads/branch-1.4 ecf0d8a9f - 1388a469b [SPARK-7266] Add ExpectsInputTypes to expressions when possible. This should gives us better analysis time error messages (rather than runtime) and automatic type casting. Author: Reynold Xin r...@databricks.com Closes #5796 from rxin/expected-input-types and squashes the following commits: c900760 [Reynold Xin] [SPARK-7266] Add ExpectsInputTypes to expressions when possible. (cherry picked from commit 678c4da0fa1bbfb6b5a0d3aced7aefa1bbbc193c) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1388a469 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1388a469 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1388a469 Branch: refs/heads/branch-1.4 Commit: 1388a469b178486eac8645e155aa9c94bae6a181 Parents: ecf0d8a Author: Reynold Xin r...@databricks.com Authored: Mon May 4 18:03:07 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon May 4 18:03:33 2015 -0700 -- .../catalyst/analysis/HiveTypeCoercion.scala| 58 ++-- .../sql/catalyst/expressions/Expression.scala | 3 +- .../sql/catalyst/expressions/arithmetic.scala | 4 +- .../sql/catalyst/expressions/predicates.scala | 22 +--- .../catalyst/expressions/stringOperations.scala | 40 -- 5 files changed, 71 insertions(+), 56 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1388a469/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index 73c9a1c..831fb4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -239,37 +239,43 @@ trait HiveTypeCoercion { a.makeCopy(Array(a.left, Cast(a.right, DoubleType))) // we should cast all timestamp/date/string compare into string compare - case p: BinaryPredicate if p.left.dataType == StringType - p.right.dataType == DateType = + case p: BinaryComparison if p.left.dataType == StringType + p.right.dataType == DateType = p.makeCopy(Array(p.left, Cast(p.right, StringType))) - case p: BinaryPredicate if p.left.dataType == DateType - p.right.dataType == StringType = + case p: BinaryComparison if p.left.dataType == DateType + p.right.dataType == StringType = p.makeCopy(Array(Cast(p.left, StringType), p.right)) - case p: BinaryPredicate if p.left.dataType == StringType - p.right.dataType == TimestampType = + case p: BinaryComparison if p.left.dataType == StringType + p.right.dataType == TimestampType = p.makeCopy(Array(p.left, Cast(p.right, StringType))) - case p: BinaryPredicate if p.left.dataType == TimestampType - p.right.dataType == StringType = + case p: BinaryComparison if p.left.dataType == TimestampType + p.right.dataType == StringType = p.makeCopy(Array(Cast(p.left, StringType), p.right)) - case p: BinaryPredicate if p.left.dataType == TimestampType - p.right.dataType == DateType = + case p: BinaryComparison if p.left.dataType == TimestampType + p.right.dataType == DateType = p.makeCopy(Array(Cast(p.left, StringType), Cast(p.right, StringType))) - case p: BinaryPredicate if p.left.dataType == DateType - p.right.dataType == TimestampType = + case p: BinaryComparison if p.left.dataType == DateType + p.right.dataType == TimestampType = p.makeCopy(Array(Cast(p.left, StringType), Cast(p.right, StringType))) - case p: BinaryPredicate if p.left.dataType == StringType p.right.dataType != StringType = + case p: BinaryComparison if p.left.dataType == StringType + p.right.dataType != StringType = p.makeCopy(Array(Cast(p.left, DoubleType), p.right)) - case p: BinaryPredicate if p.left.dataType != StringType p.right.dataType == StringType = + case p: BinaryComparison if p.left.dataType != StringType + p.right.dataType == StringType = p.makeCopy(Array(p.left, Cast(p.right, DoubleType))) - case i @ In(a, b) if a.dataType == DateType b.forall(_.dataType ==
spark git commit: [SPARK-7266] Add ExpectsInputTypes to expressions when possible.
Repository: spark Updated Branches: refs/heads/master 805541117 - 678c4da0f [SPARK-7266] Add ExpectsInputTypes to expressions when possible. This should gives us better analysis time error messages (rather than runtime) and automatic type casting. Author: Reynold Xin r...@databricks.com Closes #5796 from rxin/expected-input-types and squashes the following commits: c900760 [Reynold Xin] [SPARK-7266] Add ExpectsInputTypes to expressions when possible. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/678c4da0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/678c4da0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/678c4da0 Branch: refs/heads/master Commit: 678c4da0fa1bbfb6b5a0d3aced7aefa1bbbc193c Parents: 8055411 Author: Reynold Xin r...@databricks.com Authored: Mon May 4 18:03:07 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon May 4 18:03:07 2015 -0700 -- .../catalyst/analysis/HiveTypeCoercion.scala| 58 ++-- .../sql/catalyst/expressions/Expression.scala | 3 +- .../sql/catalyst/expressions/arithmetic.scala | 4 +- .../sql/catalyst/expressions/predicates.scala | 22 +--- .../catalyst/expressions/stringOperations.scala | 40 -- 5 files changed, 71 insertions(+), 56 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/678c4da0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index 73c9a1c..831fb4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -239,37 +239,43 @@ trait HiveTypeCoercion { a.makeCopy(Array(a.left, Cast(a.right, DoubleType))) // we should cast all timestamp/date/string compare into string compare - case p: BinaryPredicate if p.left.dataType == StringType - p.right.dataType == DateType = + case p: BinaryComparison if p.left.dataType == StringType + p.right.dataType == DateType = p.makeCopy(Array(p.left, Cast(p.right, StringType))) - case p: BinaryPredicate if p.left.dataType == DateType - p.right.dataType == StringType = + case p: BinaryComparison if p.left.dataType == DateType + p.right.dataType == StringType = p.makeCopy(Array(Cast(p.left, StringType), p.right)) - case p: BinaryPredicate if p.left.dataType == StringType - p.right.dataType == TimestampType = + case p: BinaryComparison if p.left.dataType == StringType + p.right.dataType == TimestampType = p.makeCopy(Array(p.left, Cast(p.right, StringType))) - case p: BinaryPredicate if p.left.dataType == TimestampType - p.right.dataType == StringType = + case p: BinaryComparison if p.left.dataType == TimestampType + p.right.dataType == StringType = p.makeCopy(Array(Cast(p.left, StringType), p.right)) - case p: BinaryPredicate if p.left.dataType == TimestampType - p.right.dataType == DateType = + case p: BinaryComparison if p.left.dataType == TimestampType + p.right.dataType == DateType = p.makeCopy(Array(Cast(p.left, StringType), Cast(p.right, StringType))) - case p: BinaryPredicate if p.left.dataType == DateType - p.right.dataType == TimestampType = + case p: BinaryComparison if p.left.dataType == DateType + p.right.dataType == TimestampType = p.makeCopy(Array(Cast(p.left, StringType), Cast(p.right, StringType))) - case p: BinaryPredicate if p.left.dataType == StringType p.right.dataType != StringType = + case p: BinaryComparison if p.left.dataType == StringType + p.right.dataType != StringType = p.makeCopy(Array(Cast(p.left, DoubleType), p.right)) - case p: BinaryPredicate if p.left.dataType != StringType p.right.dataType == StringType = + case p: BinaryComparison if p.left.dataType != StringType + p.right.dataType == StringType = p.makeCopy(Array(p.left, Cast(p.right, DoubleType))) - case i @ In(a, b) if a.dataType == DateType b.forall(_.dataType == StringType) = + case i @ In(a, b) if a.dataType == DateType + b.forall(_.dataType == StringType) =
spark git commit: [SPARK-7236] [CORE] Fix to prevent AkkaUtils askWithReply from sleeping on final attempt
Repository: spark Updated Branches: refs/heads/master 678c4da0f - 8aa5aea7f [SPARK-7236] [CORE] Fix to prevent AkkaUtils askWithReply from sleeping on final attempt Added a check so that if `AkkaUtils.askWithReply` is on the final attempt, it will not sleep for the `retryInterval`. This should also prevent the thread from sleeping for `Int.Max` when using `askWithReply` with default values for `maxAttempts` and `retryInterval`. Author: Bryan Cutler bjcut...@us.ibm.com Closes #5896 from BryanCutler/askWithReply-sleep-7236 and squashes the following commits: 653a07b [Bryan Cutler] [SPARK-7236] Fix to prevent AkkaUtils askWithReply from sleeping on final attempt Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8aa5aea7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8aa5aea7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8aa5aea7 Branch: refs/heads/master Commit: 8aa5aea7fee0ae9cd34e16c30655ee02b8747455 Parents: 678c4da Author: Bryan Cutler bjcut...@us.ibm.com Authored: Mon May 4 18:29:22 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon May 4 18:29:22 2015 -0700 -- core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8aa5aea7/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index b725df3..de3316d 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -183,7 +183,9 @@ private[spark] object AkkaUtils extends Logging { lastException = e logWarning(sError sending message [message = $message] in $attempts attempts, e) } - Thread.sleep(retryInterval) + if (attempts maxAttempts) { +Thread.sleep(retryInterval) + } } throw new SparkException( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-7236] [CORE] Fix to prevent AkkaUtils askWithReply from sleeping on final attempt
Repository: spark Updated Branches: refs/heads/branch-1.4 1388a469b - 48655d10e [SPARK-7236] [CORE] Fix to prevent AkkaUtils askWithReply from sleeping on final attempt Added a check so that if `AkkaUtils.askWithReply` is on the final attempt, it will not sleep for the `retryInterval`. This should also prevent the thread from sleeping for `Int.Max` when using `askWithReply` with default values for `maxAttempts` and `retryInterval`. Author: Bryan Cutler bjcut...@us.ibm.com Closes #5896 from BryanCutler/askWithReply-sleep-7236 and squashes the following commits: 653a07b [Bryan Cutler] [SPARK-7236] Fix to prevent AkkaUtils askWithReply from sleeping on final attempt (cherry picked from commit 8aa5aea7fee0ae9cd34e16c30655ee02b8747455) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/48655d10 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/48655d10 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/48655d10 Branch: refs/heads/branch-1.4 Commit: 48655d10ede9f286140ebe6d72be04b8ea9c3d85 Parents: 1388a46 Author: Bryan Cutler bjcut...@us.ibm.com Authored: Mon May 4 18:29:22 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon May 4 18:30:02 2015 -0700 -- core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/48655d10/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index b725df3..de3316d 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -183,7 +183,9 @@ private[spark] object AkkaUtils extends Logging { lastException = e logWarning(sError sending message [message = $message] in $attempts attempts, e) } - Thread.sleep(retryInterval) + if (attempts maxAttempts) { +Thread.sleep(retryInterval) + } } throw new SparkException( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[6/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/resources/org/apache/spark/ui/static/d3.min.js -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/d3.min.js b/core/src/main/resources/org/apache/spark/ui/static/d3.min.js new file mode 100644 index 000..30cd292 --- /dev/null +++ b/core/src/main/resources/org/apache/spark/ui/static/d3.min.js @@ -0,0 +1,5 @@ +/*v3.5.5*/!function(){function n(n){return n(n.ownerDocument||n.document||n).documentElement}function t(n){return n(n.ownerDocumentn.ownerDocument.defaultView||n.documentn||n.defaultView)}function e(n,t){return tn?-1:nt?1:n=t?0:0/0}function r(n){return null===n?0/0:+n}function u(n){return!isNaN(n)}function i(n){return{left:function(t,e,r,u){for(arguments.length3(r=0),arguments.length4(u=t.length);ur;){var i=r+u1;n(t[i],e)0?r=i+1:u=i}return r},right:function(t,e,r,u){for(arguments.length3(r=0),arguments.length4(u=t.length);ur;){var i=r+u1;n(t[i],e)0?u=i:r=i+1}return r}}}function o(n){return n.length}function a(n){for(var t=1;n*t%1;)t*=10;return t}function c(n,t){for(var e in t)Object.defineProperty(n.prototype,e,{value:t[e],enumerable:!1})}function l(){this._=Object.create(null)}function s(n){return(n+=)===pa||n[0]===va?va+n:n}function f(n){return(n+=)[0]===va?n.slice(1):n}function h(n){return s(n)in this._}function g(n){return(n=s(n))in this._d elete this._[n]}function p(){var n=[];for(var t in this._)n.push(f(t));return n}function v(){var n=0;for(var t in this._)++n;return n}function d(){for(var n in this._)return!1;return!0}function m(){this._=Object.create(null)}function y(n){return n}function M(n,t,e){return function(){var r=e.apply(t,arguments);return r===t?n:r}}function x(n,t){if(t in n)return t;t=t.charAt(0).toUpperCase()+t.slice(1);for(var e=0,r=da.length;re;++e){var u=da[e]+t;if(u in n)return u}}function b(){}function _(){}function w(n){function t(){for(var t,r=e,u=-1,i=r.length;++ui;)(t=r[u].on)t.apply(this,arguments);return n}var e=[],r=new l;return t.on=function(t,u){var i,o=r.get(t);return arguments.length2?oo.on:(o(o.on=null,e=e.slice(0,i=e.indexOf(o)).concat(e.slice(i+1)),r.remove(t)),ue.push(r.set(t,{on:u})),n)},t}function S(){ta.event.preventDefault()}function k(){for(var n,t=ta.event;n=t.sourceEvent;)t=n;return t}function E(n){for(var t=new _,e=0,r=arguments.length;++er;)t[arguments[e]]=w(t);r eturn t.of=function(e,r){return function(u){try{var i=u.sourceEvent=ta.event;u.target=n,ta.event=u,t[u.type].apply(e,r)}finally{ta.event=i}}},t}function A(n){return ya(n,_a),n}function N(n){returnfunction==typeof n?n:function(){return Ma(n,this)}}function C(n){returnfunction==typeof n?n:function(){return xa(n,this)}}function z(n,t){function e(){this.removeAttribute(n)}function r(){this.removeAttributeNS(n.space,n.local)}function u(){this.setAttribute(n,t)}function i(){this.setAttributeNS(n.space,n.local,t)}function o(){var e=t.apply(this,arguments);null==e?this.removeAttribute(n):this.setAttribute(n,e)}function a(){var e=t.apply(this,arguments);null==e?this.removeAttributeNS(n.space,n.local):this.setAttributeNS(n.space,n.local,e)}return n=ta.ns.qualify(n),null==t?n.local?r:e:function==typeof t?n.local?a:o:n.local?i:u}function q(n){return n.trim().replace(/\s+/g, )}function L(n){return new RegExp((?:^|\\s+)+ta.requote(n)+(?:\\s+|$),g)}function T(n){return(n+).trim().s plit(/^|\s+/)}function R(n,t){function e(){for(var e=-1;++eu;)n[e](this,t)}function r(){for(var e=-1,r=t.apply(this,arguments);++eu;)n[e](this,r)}n=T(n).map(D);var u=n.length;returnfunction==typeof t?r:e}function D(n){var t=L(n);return function(e,r){if(u=e.classList)return r?u.add(n):u.remove(n);var u=e.getAttribute(class)||;r?(t.lastIndex=0,t.test(u)||e.setAttribute(class,q(u+ +n))):e.setAttribute(class,q(u.replace(t, )))}}function P(n,t,e){function r(){this.style.removeProperty(n)}function u(){this.style.setProperty(n,t,e)}function i(){var r=t.apply(this,arguments);null==r?this.style.removeProperty(n):this.style.setProperty(n,r,e)}return null==t?r:function==typeof t?i:u}function U(n,t){function e(){delete this[n]}function r(){this[n]=t}function u(){var e=t.apply(this,arguments);null==e?delete this[n]:this[n]=e}return null==t?e:function==typeof t?u:r}function j(n){function t(){var t=this.ownerDocument,e=this.namespaceURI;return e?t.createElementNS(e,n):t.createE lement(n)}function e(){return this.ownerDocument.createElementNS(n.space,n.local)}returnfunction==typeof n?n:(n=ta.ns.qualify(n)).local?e:t}function F(){var n=this.parentNode;nn.removeChild(this)}function H(n){return{__data__:n}}function O(n){return function(){return ba(this,n)}}function I(n){return arguments.length||(n=e),function(t,e){return te?n(t.__data__,e.__data__):!t-!e}}function Y(n,t){for(var e=0,r=n.length;re;e++)for(var u,i=n[e],o=0,a=i.length;ao;o++)(u=i[o])t(u,o,e);return n}function Z(n){return ya(n,Sa),n}function V(n){var
[5/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js b/core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js new file mode 100644 index 000..6d2da25 --- /dev/null +++ b/core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js @@ -0,0 +1,29 @@ +/*v0.4.3 with 1 additional commit (see http://github.com/andrewor14/dagre-d3)*/(function(f){if(typeof exports===objecttypeof module!==undefined){module.exports=f()}else if(typeof define===functiondefine.amd){define([],f)}else{var g;if(typeof window!==undefined){g=window}else if(typeof global!==undefined){g=global}else if(typeof self!==undefined){g=self}else{g=this}g.dagreD3=f()}})(function(){var define,module,exports;return function e(t,n,r){function s(o,u){if(!n[o]){if(!t[o]){var a=typeof require==functionrequire;if(!ua)return a(o,!0);if(i)return i(o,!0);var f=new Error(Cannot find module '+o+');throw f.code=MODULE_NOT_FOUND,f}var l=n[o]={exports:{}};t[o][0].call(l.exports,function(e){var n=t[o][1][e];return s(n?n:e)},l,l.exports,e,t,n,r)}return n[o].exports}var i=typeof require==functionrequire;for(var o=0;or.length;o++)s(r[o]);return s}({1:[function(require,module,exports){/** + * @license + * Copyright (c) 2012-2013 Chris Pettitt + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the Software), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED AS IS, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +module.exports={graphlib:require(./lib/graphlib),dagre:require(./lib/dagre),intersect:require(./lib/intersect),render:require(./lib/render),util:require(./lib/util),version:require(./lib/version)}},{./lib/dagre:8,./lib/graphlib:9,./lib/intersect:10,./lib/render:23,./lib/util:25,./lib/version:26}],2:[function(require,module,exports){var util=require(./util);module.exports={default:normal,normal:normal,vee:vee,undirected:undirected};function normal(parent,id,edge,type){var marker=parent.append(marker).attr(id,id).attr(viewBox,0 0 10 10).attr(refX,9).attr(refY,5).attr(markerUnits,strokeWidth).attr(markerWidth,8).attr(markerHeight,6).attr(orient,auto);var path=marker.append(path).attr(d,M 0 0 L 10 5 L 0 10 z).style(stroke-width,1).style(stroke-dasharray,1,0);util.applyStyle(path,edge[type+Style])}function vee(parent,id,edge,type){var marker=parent.append(marker).attr(id,id).attr(viewBox,0 0 10 10).attr(refX,9).a ttr(refY,5).attr(markerUnits,strokeWidth).attr(markerWidth,8).attr(markerHeight,6).attr(orient,auto);var path=marker.append(path).attr(d,M 0 0 L 10 5 L 0 10 L 4 5 z).style(stroke-width,1).style(stroke-dasharray,1,0);util.applyStyle(path,edge[type+Style])}function undirected(parent,id,edge,type){var marker=parent.append(marker).attr(id,id).attr(viewBox,0 0 10 10).attr(refX,9).attr(refY,5).attr(markerUnits,strokeWidth).attr(markerWidth,8).attr(markerHeight,6).attr(orient,auto);var path=marker.append(path).attr(d,M 0 5 L 10 5).style(stroke-width,1).style(stroke-dasharray,1,0);util.applyStyle(path,edge[type+Style])}},{./util:25}],3:[function(require,module,exports){var util=require(./util);module.exports=createClusters;function createClusters(selection,g){var clusters=g.nodes().filter(function(v){return util.isSubgraph(g,v)}),svgClusters=selection.selectAll(g.cluster).data(clusters,function(v){return v});svgClusters .enter().append(g).attr(id,function(v){returncluster_+v.replace(/^cluster/,)}).attr(name,function(v){return g.node(v).label}).attr(class,cluster).style(opacity,0).append(rect);util.applyTransition(svgClusters.exit(),g).style(opacity,0).remove();util.applyTransition(svgClusters,g).style(opacity,1);util.applyTransition(svgClusters.selectAll(rect),g).attr(width,function(v){return g.node(v).width}).attr(height,function(v){return g.node(v).height}).attr(x,function(v){var node=g.node(v);return node.x-node.width/2}).attr(y,function(v){var node=g.node(v);return
[1/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI
Repository: spark Updated Branches: refs/heads/master f32e69ecc - fc8b58195 http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index ee02fbd..3f162d1 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -28,10 +28,11 @@ import org.json4s.DefaultFormats import org.json4s.JsonDSL._ import org.json4s.JsonAST._ +import org.apache.spark._ import org.apache.spark.executor._ +import org.apache.spark.rdd.RDDOperationScope import org.apache.spark.scheduler._ import org.apache.spark.storage._ -import org.apache.spark._ /** * Serializes SparkListener events to/from JSON. This protocol provides strong backwards- @@ -228,6 +229,7 @@ private[spark] object JsonProtocol { def stageInfoToJson(stageInfo: StageInfo): JValue = { val rddInfo = JArray(stageInfo.rddInfos.map(rddInfoToJson).toList) +val parentIds = JArray(stageInfo.parentIds.map(JInt(_)).toList) val submissionTime = stageInfo.submissionTime.map(JInt(_)).getOrElse(JNothing) val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing) val failureReason = stageInfo.failureReason.map(JString(_)).getOrElse(JNothing) @@ -236,6 +238,7 @@ private[spark] object JsonProtocol { (Stage Name - stageInfo.name) ~ (Number of Tasks - stageInfo.numTasks) ~ (RDD Info - rddInfo) ~ +(Parent IDs - parentIds) ~ (Details - stageInfo.details) ~ (Submission Time - submissionTime) ~ (Completion Time - completionTime) ~ @@ -368,8 +371,11 @@ private[spark] object JsonProtocol { def rddInfoToJson(rddInfo: RDDInfo): JValue = { val storageLevel = storageLevelToJson(rddInfo.storageLevel) +val parentIds = JArray(rddInfo.parentIds.map(JInt(_)).toList) (RDD ID - rddInfo.id) ~ (Name - rddInfo.name) ~ +(Scope - rddInfo.scope.map(_.toJson)) ~ +(Parent IDs - parentIds) ~ (Storage Level - storageLevel) ~ (Number of Partitions - rddInfo.numPartitions) ~ (Number of Cached Partitions - rddInfo.numCachedPartitions) ~ @@ -519,7 +525,7 @@ private[spark] object JsonProtocol { // The Stage Infos field was added in Spark 1.2.0 val stageInfos = Utils.jsonOption(json \ Stage Infos) .map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse { -stageIds.map(id = new StageInfo(id, 0, unknown, 0, Seq.empty, unknown)) +stageIds.map(id = new StageInfo(id, 0, unknown, 0, Seq.empty, Seq.empty, unknown)) } SparkListenerJobStart(jobId, submissionTime, stageInfos, properties) } @@ -599,7 +605,10 @@ private[spark] object JsonProtocol { val attemptId = (json \ Stage Attempt ID).extractOpt[Int].getOrElse(0) val stageName = (json \ Stage Name).extract[String] val numTasks = (json \ Number of Tasks).extract[Int] -val rddInfos = (json \ RDD Info).extract[List[JValue]].map(rddInfoFromJson(_)) +val rddInfos = (json \ RDD Info).extract[List[JValue]].map(rddInfoFromJson) +val parentIds = Utils.jsonOption(json \ Parent IDs) + .map { l = l.extract[List[JValue]].map(_.extract[Int]) } + .getOrElse(Seq.empty) val details = (json \ Details).extractOpt[String].getOrElse() val submissionTime = Utils.jsonOption(json \ Submission Time).map(_.extract[Long]) val completionTime = Utils.jsonOption(json \ Completion Time).map(_.extract[Long]) @@ -609,7 +618,8 @@ private[spark] object JsonProtocol { case None = Seq[AccumulableInfo]() } -val stageInfo = new StageInfo(stageId, attemptId, stageName, numTasks, rddInfos, details) +val stageInfo = new StageInfo( + stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details) stageInfo.submissionTime = submissionTime stageInfo.completionTime = completionTime stageInfo.failureReason = failureReason @@ -785,16 +795,22 @@ private[spark] object JsonProtocol { def rddInfoFromJson(json: JValue): RDDInfo = { val rddId = (json \ RDD ID).extract[Int] val name = (json \ Name).extract[String] +val scope = Utils.jsonOption(json \ Scope) + .map(_.extract[String]) + .map(RDDOperationScope.fromJson) +val parentIds = Utils.jsonOption(json \ Parent IDs) + .map { l = l.extract[List[JValue]].map(_.extract[Int]) } + .getOrElse(Seq.empty) val storageLevel = storageLevelFromJson(json \ Storage Level) val numPartitions = (json \ Number of Partitions).extract[Int] val numCachedPartitions = (json \ Number of Cached Partitions).extract[Int] val memSize = (json \ Memory Size).extract[Long] -// fallback to tachyon for backward compatability +// fallback to tachyon for backward compatibility val
[4/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI
http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/resources/org/apache/spark/ui/static/graphlib-dot.min.js -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/graphlib-dot.min.js b/core/src/main/resources/org/apache/spark/ui/static/graphlib-dot.min.js new file mode 100644 index 000..037316f --- /dev/null +++ b/core/src/main/resources/org/apache/spark/ui/static/graphlib-dot.min.js @@ -0,0 +1,4 @@ +/*v0.5.2*/(function e(t,n,r){function s(o,u){if(!n[o]){if(!t[o]){var a=typeof require==functionrequire;if(!ua)return a(o,!0);if(i)return i(o,!0);var f=new Error(Cannot find module '+o+');throw f.code=MODULE_NOT_FOUND,f}var l=n[o]={exports:{}};t[o][0].call(l.exports,function(e){var n=t[o][1][e];return s(n?n:e)},l,l.exports,e,t,n,r)}return n[o].exports}var i=typeof require==functionrequire;for(var o=0;or.length;o++)s(r[o]);return s})({1:[function(require,module,exports){(function(global){graphlibDot=require(./index);graphlibDot.graphlib=require(graphlib);global.graphlibDot=graphlibDot}).call(this,typeof global!==undefined?global:typeof self!==undefined?self:typeof window!==undefined?window:{})},{./index:2,graphlib:9}],2:[function(require,module,exports){var read=require(./lib/read-one),readMany=require(./lib/read-many),write=require(./lib/write-one),version=require(./lib/version);module.exports={read:read,readMany:readMany,write:write,version:versi on,type:dot,buffer:false}},{./lib/read-many:5,./lib/read-one:6,./lib/version:7,./lib/write-one:8}],3:[function(require,module,exports){use strict;var _=require(lodash),Graph=require(graphlib).Graph;module.exports=buildGraph;function buildGraph(parseTree){var isDirected=parseTree.type!==graph,isMultigraph=!parseTree.strict,defaultStack=[{node:{},edge:{}}],g=new Graph({directed:isDirected,multigraph:isMultigraph,compound:true});g.setGraph({});_.each(parseTree.stmts,function(stmt){handleStmt(g,stmt,defaultStack)});return g}function handleStmt(g,stmt,defaultStack,sg){switch(stmt.type){casenode:handleNodeStmt(g,stmt,defaultStack,sg);break;caseedge:handleEdgeStmt(g,stmt,defaultStack,sg);break;casesubgraph:handleSubgraphStmt(g,stmt,defaultStack,sg);break;caseattr:handleAttrStmt(g,stmt,defaultStack);break;caseinlineAttr:handleInlineAttrsStmt(g,stmt,defaultStack,sg);break}}function handleNodeStmt(g,stmt,defaultStack,sg){var v=stmt.id,attrs=stmt.attrs;maybeCreateNo de(g,v,defaultStack,sg);_.merge(g.node(v),attrs)}function handleEdgeStmt(g,stmt,defaultStack,sg){var attrs=stmt.attrs,prev,curr;_.each(stmt.elems,function(elem){handleStmt(g,elem,defaultStack,sg);switch(elem.type){casenode:curr=[elem.id];break;casesubgraph:curr=collectNodeIds(elem);break}_.each(prev,function(v){_.each(curr,function(w){var name;if(g.hasEdge(v,w)g.isMultigraph()){name=_.uniqueId(edge)}if(!g.hasEdge(v,w,name)){g.setEdge(v,w,_.clone(_.last(defaultStack).edge),name)}_.merge(g.edge(v,w,name),attrs)})});prev=curr})}function handleSubgraphStmt(g,stmt,defaultStack,sg){var id=stmt.id;if(id===undefined){id=generateSubgraphId(g)}defaultStack.push(_.clone(_.last(defaultStack)));maybeCreateNode(g,id,defaultStack,sg);_.each(stmt.stmts,function(s){handleStmt(g,s,defaultStack,id)});if(!g.children(id).length){g.removeNode(id)}defaultStack.pop()}function handleAttrStmt(g,stmt,defaultStack){_.merge(_.last(defaultStack)[stmt.attrType],stmt.attrs)}function handleInlineAttrsStmt(g ,stmt,defaultStack,sg){_.merge(sg?g.node(sg):g.graph(),stmt.attrs)}function generateSubgraphId(g){var id;do{id=_.uniqueId(sg)}while(g.hasNode(id));return id}function maybeCreateNode(g,v,defaultStack,sg){if(!g.hasNode(v)){g.setNode(v,_.clone(_.last(defaultStack).node));g.setParent(v,sg)}}function collectNodeIds(stmt){var ids={},stack=[],curr;var push=stack.push.bind(stack);push(stmt);while(stack.length){curr=stack.pop();switch(curr.type){casenode:ids[curr.id]=true;break;caseedge:_.each(curr.elems,push);break;casesubgraph:_.each(curr.stmts,push);break}}return _.keys(ids)}},{graphlib:9,lodash:28}],4:[function(require,module,exports){module.exports=function(){function peg$subclass(child,parent){function ctor(){this.constructor=child}ctor.prototype=parent.prototype;child.prototype=new ctor}function SyntaxError(message,expected,found,offset,line,column){this.message=message;this.expected=expected;this.found=found;this.offset=offset;this.line=line;this.column=column;this.name=Synt axError}peg$subclass(SyntaxError,Error);function parse(input){var
[5/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI
http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js b/core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js new file mode 100644 index 000..6d2da25 --- /dev/null +++ b/core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js @@ -0,0 +1,29 @@ +/*v0.4.3 with 1 additional commit (see http://github.com/andrewor14/dagre-d3)*/(function(f){if(typeof exports===objecttypeof module!==undefined){module.exports=f()}else if(typeof define===functiondefine.amd){define([],f)}else{var g;if(typeof window!==undefined){g=window}else if(typeof global!==undefined){g=global}else if(typeof self!==undefined){g=self}else{g=this}g.dagreD3=f()}})(function(){var define,module,exports;return function e(t,n,r){function s(o,u){if(!n[o]){if(!t[o]){var a=typeof require==functionrequire;if(!ua)return a(o,!0);if(i)return i(o,!0);var f=new Error(Cannot find module '+o+');throw f.code=MODULE_NOT_FOUND,f}var l=n[o]={exports:{}};t[o][0].call(l.exports,function(e){var n=t[o][1][e];return s(n?n:e)},l,l.exports,e,t,n,r)}return n[o].exports}var i=typeof require==functionrequire;for(var o=0;or.length;o++)s(r[o]);return s}({1:[function(require,module,exports){/** + * @license + * Copyright (c) 2012-2013 Chris Pettitt + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the Software), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED AS IS, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +module.exports={graphlib:require(./lib/graphlib),dagre:require(./lib/dagre),intersect:require(./lib/intersect),render:require(./lib/render),util:require(./lib/util),version:require(./lib/version)}},{./lib/dagre:8,./lib/graphlib:9,./lib/intersect:10,./lib/render:23,./lib/util:25,./lib/version:26}],2:[function(require,module,exports){var util=require(./util);module.exports={default:normal,normal:normal,vee:vee,undirected:undirected};function normal(parent,id,edge,type){var marker=parent.append(marker).attr(id,id).attr(viewBox,0 0 10 10).attr(refX,9).attr(refY,5).attr(markerUnits,strokeWidth).attr(markerWidth,8).attr(markerHeight,6).attr(orient,auto);var path=marker.append(path).attr(d,M 0 0 L 10 5 L 0 10 z).style(stroke-width,1).style(stroke-dasharray,1,0);util.applyStyle(path,edge[type+Style])}function vee(parent,id,edge,type){var marker=parent.append(marker).attr(id,id).attr(viewBox,0 0 10 10).attr(refX,9).a ttr(refY,5).attr(markerUnits,strokeWidth).attr(markerWidth,8).attr(markerHeight,6).attr(orient,auto);var path=marker.append(path).attr(d,M 0 0 L 10 5 L 0 10 L 4 5 z).style(stroke-width,1).style(stroke-dasharray,1,0);util.applyStyle(path,edge[type+Style])}function undirected(parent,id,edge,type){var marker=parent.append(marker).attr(id,id).attr(viewBox,0 0 10 10).attr(refX,9).attr(refY,5).attr(markerUnits,strokeWidth).attr(markerWidth,8).attr(markerHeight,6).attr(orient,auto);var path=marker.append(path).attr(d,M 0 5 L 10 5).style(stroke-width,1).style(stroke-dasharray,1,0);util.applyStyle(path,edge[type+Style])}},{./util:25}],3:[function(require,module,exports){var util=require(./util);module.exports=createClusters;function createClusters(selection,g){var clusters=g.nodes().filter(function(v){return util.isSubgraph(g,v)}),svgClusters=selection.selectAll(g.cluster).data(clusters,function(v){return v});svgClusters .enter().append(g).attr(id,function(v){returncluster_+v.replace(/^cluster/,)}).attr(name,function(v){return g.node(v).label}).attr(class,cluster).style(opacity,0).append(rect);util.applyTransition(svgClusters.exit(),g).style(opacity,0).remove();util.applyTransition(svgClusters,g).style(opacity,1);util.applyTransition(svgClusters.selectAll(rect),g).attr(width,function(v){return g.node(v).width}).attr(height,function(v){return g.node(v).height}).attr(x,function(v){var node=g.node(v);return node.x-node.width/2}).attr(y,function(v){var node=g.node(v);return
[7/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI
[SPARK-6943] [SPARK-6944] DAG visualization on SparkUI This patch adds the functionality to display the RDD DAG on the SparkUI. This DAG describes the relationships between - an RDD and its dependencies, - an RDD and its operation scopes, and - an RDD's operation scopes and the stage / job hierarchy An operation scope here refers to the existing public APIs that created the RDDs (e.g. `textFile`, `treeAggregate`). In the future, we can expand this to include higher level operations like SQL queries. *Note: This blatantly stole a few lines of HTML and JavaScript from #5547 (thanks shroffpradyumn!)* Here's what the job page looks like: img src=https://issues.apache.org/jira/secure/attachment/12730286/job-page.png; width=700px/ and the stage page: img src=https://issues.apache.org/jira/secure/attachment/12730287/stage-page.png; width=300px/ Author: Andrew Or and...@databricks.com Closes #5729 from andrewor14/viz2 and squashes the following commits: 666c03b [Andrew Or] Round corners of RDD boxes on stage page (minor) 01ba336 [Andrew Or] Change RDD cache color to red (minor) 6f9574a [Andrew Or] Add tests for RDDOperationScope 1c310e4 [Andrew Or] Wrap a few more RDD functions in an operation scope 3ffe566 [Andrew Or] Restore null as default for RDD name 5fdd89d [Andrew Or] children - child (minor) 0d07a84 [Andrew Or] Fix python style afb98e2 [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz2 0d7aa32 [Andrew Or] Fix python tests 3459ab2 [Andrew Or] Fix tests 832443c [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz2 429e9e1 [Andrew Or] Display cached RDDs on the viz b1f0fd1 [Andrew Or] Rename OperatorScope - RDDOperationScope 31aae06 [Andrew Or] Extract visualization logic from listener 83f9c58 [Andrew Or] Implement a programmatic representation of operator scopes 5a7faf4 [Andrew Or] Rename references to viz scopes to viz clusters ee33d52 [Andrew Or] Separate HTML generating code from listener f9830a2 [Andrew Or] Refactor + clean up + document JS visualization code b80cc52 [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz2 0706992 [Andrew Or] Add link from jobs to stages deb48a0 [Andrew Or] Translate stage boxes taking into account the width 5c7ce16 [Andrew Or] Connect RDDs across stages + update style ab91416 [Andrew Or] Introduce visualization to the Job Page 5f07e9c [Andrew Or] Remove more return statements from scopes 5e388ea [Andrew Or] Fix line too long 43de96e [Andrew Or] Add parent IDs to StageInfo 6e2cfea [Andrew Or] Remove all return statements in `withScope` d19c4da [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz2 7ef957c [Andrew Or] Fix scala style 4310271 [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz2 aa868a9 [Andrew Or] Ensure that HadoopRDD is actually serializable c3bfcae [Andrew Or] Re-implement scopes using closures instead of annotations 52187fc [Andrew Or] Rat excludes 09d361e [Andrew Or] Add ID to node label (minor) 71281fa [Andrew Or] Embed the viz in the UI in a toggleable manner 8dd5af2 [Andrew Or] Fill in documentation + miscellaneous minor changes fe7816f [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz 205f838 [Andrew Or] Reimplement rendering with dagre-d3 instead of viz.js 5e22946 [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz 6a7cdca [Andrew Or] Move RDD scope util methods and logic to its own file 494d5c2 [Andrew Or] Revert a few unintended style changes 9fac6f3 [Andrew Or] Re-implement scopes through annotations instead f22f337 [Andrew Or] First working implementation of visualization with vis.js 2184348 [Andrew Or] Translate RDD information to dot file 5143523 [Andrew Or] Expose the necessary information in RDDInfo a9ed4f9 [Andrew Or] Add a few missing scopes to certain RDD methods 6b3403b [Andrew Or] Scope all RDD methods Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/863ec0cb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/863ec0cb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/863ec0cb Branch: refs/heads/branch-1.4 Commit: 863ec0cb4de7dc77987117b35454cf79e240b1e7 Parents: 34edaa8 Author: Andrew Or and...@databricks.com Authored: Mon May 4 16:21:36 2015 -0700 Committer: Andrew Or and...@databricks.com Committed: Mon May 4 16:24:35 2015 -0700 -- .rat-excludes | 3 + .../org/apache/spark/ui/static/d3.min.js| 5 + .../org/apache/spark/ui/static/dagre-d3.min.js | 29 ++ .../apache/spark/ui/static/graphlib-dot.min.js | 4 + .../org/apache/spark/ui/static/spark-dag-viz.js | 392 +++ .../org/apache/spark/ui/static/webui.css| 2 +- .../scala/org/apache/spark/SparkContext.scala | 97 +++-- .../org/apache/spark/rdd/AsyncRDDActions.scala | 10 +-
[3/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI
http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js new file mode 100644 index 000..99b0294 --- /dev/null +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This file contains the logic to render the RDD DAG visualization in the UI. + * + * This DAG describes the relationships between + * (1) an RDD and its dependencies, + * (2) an RDD and its operation scopes, and + * (3) an RDD's operation scopes and the stage / job hierarchy + * + * An operation scope is a general, named code block representing an operation + * that instantiates RDDs (e.g. filter, textFile, reduceByKey). An operation + * scope can be nested inside of other scopes if the corresponding RDD operation + * invokes other such operations (for more detail, see o.a.s.rdd.operationScope). + * + * A stage may include one or more operation scopes if the RDD operations are + * streamlined into one stage (e.g. rdd.map(...).filter(...).flatMap(...)). + * On the flip side, an operation scope may also include one or many stages, + * or even jobs if the RDD operation is higher level than Spark's scheduling + * primitives (e.g. take, any SQL query). + * + * In the visualization, an RDD is expressed as a node, and its dependencies + * as directed edges (from parent to child). operation scopes, stages, and + * jobs are expressed as clusters that may contain one or many nodes. These + * clusters may be nested inside of each other in the scenarios described + * above. + * + * The visualization is rendered in an SVG contained in div#dag-viz-graph, + * and its input data is expected to be populated in div#dag-viz-metadata + * by Spark's UI code. This is currently used only on the stage page and on + * the job page. + * + * This requires jQuery, d3, and dagre-d3. Note that we use a custom release + * of dagre-d3 (http://github.com/andrewor14/dagre-d3) for some specific + * functionality. For more detail, please track the changes in that project + * since it was forked (commit 101503833a8ce5fe369547f6addf3e71172ce10b). + */ + +var VizConstants = { + rddColor: #44, + rddCachedColor: #FF, + rddOperationColor: #AADFFF, + stageColor: #FFDDEE, + clusterLabelColor: #88, + edgeColor: #44, + edgeWidth: 1.5px, + svgMarginX: 0, + svgMarginY: 20, + stageSep: 50, + graphPrefix: graph_, + nodePrefix: node_, + stagePrefix: stage_, + clusterPrefix: cluster_, + stageClusterPrefix: cluster_stage_ +}; + +// Helper d3 accessors for the elements that contain our graph and its metadata +function graphContainer() { return d3.select(#dag-viz-graph); } +function metadataContainer() { return d3.select(#dag-viz-metadata); } + +/* + * Show or hide the RDD DAG visualization. + * The graph is only rendered the first time this is called. + */ +function toggleDagViz(forJob) { + var arrowSelector = .expand-dag-viz-arrow; + $(arrowSelector).toggleClass('arrow-closed'); + $(arrowSelector).toggleClass('arrow-open'); + var shouldShow = $(arrowSelector).hasClass(arrow-open); + if (shouldShow) { +var shouldRender = graphContainer().select(svg).empty(); +if (shouldRender) { + renderDagViz(forJob); +} +graphContainer().style(display, block); + } else { +// Save the graph for later so we don't have to render it again +graphContainer().style(display, none); + } +} + +/* + * Render the RDD DAG visualization. + * + * Input DOM hierarchy: + * div#dag-viz-metadata + * div.stage-metadata + * div.[dot-file | incoming-edge | outgoing-edge] + * + * Output DOM hierarchy: + * div#dag-viz-graph + * svg + * g#cluster_stage_[stageId] + * + * Note that the input metadata is populated by o.a.s.ui.UIUtils.showDagViz. + * Any changes in the input format here must be reflected there. + */ +function renderDagViz(forJob) { + + // If there is not a dot file to render, fail fast and report
[1/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI
Repository: spark Updated Branches: refs/heads/branch-1.4 34edaa8ac - 863ec0cb4 http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index ee02fbd..3f162d1 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -28,10 +28,11 @@ import org.json4s.DefaultFormats import org.json4s.JsonDSL._ import org.json4s.JsonAST._ +import org.apache.spark._ import org.apache.spark.executor._ +import org.apache.spark.rdd.RDDOperationScope import org.apache.spark.scheduler._ import org.apache.spark.storage._ -import org.apache.spark._ /** * Serializes SparkListener events to/from JSON. This protocol provides strong backwards- @@ -228,6 +229,7 @@ private[spark] object JsonProtocol { def stageInfoToJson(stageInfo: StageInfo): JValue = { val rddInfo = JArray(stageInfo.rddInfos.map(rddInfoToJson).toList) +val parentIds = JArray(stageInfo.parentIds.map(JInt(_)).toList) val submissionTime = stageInfo.submissionTime.map(JInt(_)).getOrElse(JNothing) val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing) val failureReason = stageInfo.failureReason.map(JString(_)).getOrElse(JNothing) @@ -236,6 +238,7 @@ private[spark] object JsonProtocol { (Stage Name - stageInfo.name) ~ (Number of Tasks - stageInfo.numTasks) ~ (RDD Info - rddInfo) ~ +(Parent IDs - parentIds) ~ (Details - stageInfo.details) ~ (Submission Time - submissionTime) ~ (Completion Time - completionTime) ~ @@ -368,8 +371,11 @@ private[spark] object JsonProtocol { def rddInfoToJson(rddInfo: RDDInfo): JValue = { val storageLevel = storageLevelToJson(rddInfo.storageLevel) +val parentIds = JArray(rddInfo.parentIds.map(JInt(_)).toList) (RDD ID - rddInfo.id) ~ (Name - rddInfo.name) ~ +(Scope - rddInfo.scope.map(_.toJson)) ~ +(Parent IDs - parentIds) ~ (Storage Level - storageLevel) ~ (Number of Partitions - rddInfo.numPartitions) ~ (Number of Cached Partitions - rddInfo.numCachedPartitions) ~ @@ -519,7 +525,7 @@ private[spark] object JsonProtocol { // The Stage Infos field was added in Spark 1.2.0 val stageInfos = Utils.jsonOption(json \ Stage Infos) .map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse { -stageIds.map(id = new StageInfo(id, 0, unknown, 0, Seq.empty, unknown)) +stageIds.map(id = new StageInfo(id, 0, unknown, 0, Seq.empty, Seq.empty, unknown)) } SparkListenerJobStart(jobId, submissionTime, stageInfos, properties) } @@ -599,7 +605,10 @@ private[spark] object JsonProtocol { val attemptId = (json \ Stage Attempt ID).extractOpt[Int].getOrElse(0) val stageName = (json \ Stage Name).extract[String] val numTasks = (json \ Number of Tasks).extract[Int] -val rddInfos = (json \ RDD Info).extract[List[JValue]].map(rddInfoFromJson(_)) +val rddInfos = (json \ RDD Info).extract[List[JValue]].map(rddInfoFromJson) +val parentIds = Utils.jsonOption(json \ Parent IDs) + .map { l = l.extract[List[JValue]].map(_.extract[Int]) } + .getOrElse(Seq.empty) val details = (json \ Details).extractOpt[String].getOrElse() val submissionTime = Utils.jsonOption(json \ Submission Time).map(_.extract[Long]) val completionTime = Utils.jsonOption(json \ Completion Time).map(_.extract[Long]) @@ -609,7 +618,8 @@ private[spark] object JsonProtocol { case None = Seq[AccumulableInfo]() } -val stageInfo = new StageInfo(stageId, attemptId, stageName, numTasks, rddInfos, details) +val stageInfo = new StageInfo( + stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details) stageInfo.submissionTime = submissionTime stageInfo.completionTime = completionTime stageInfo.failureReason = failureReason @@ -785,16 +795,22 @@ private[spark] object JsonProtocol { def rddInfoFromJson(json: JValue): RDDInfo = { val rddId = (json \ RDD ID).extract[Int] val name = (json \ Name).extract[String] +val scope = Utils.jsonOption(json \ Scope) + .map(_.extract[String]) + .map(RDDOperationScope.fromJson) +val parentIds = Utils.jsonOption(json \ Parent IDs) + .map { l = l.extract[List[JValue]].map(_.extract[Int]) } + .getOrElse(Seq.empty) val storageLevel = storageLevelFromJson(json \ Storage Level) val numPartitions = (json \ Number of Partitions).extract[Int] val numCachedPartitions = (json \ Number of Cached Partitions).extract[Int] val memSize = (json \ Memory Size).extract[Long] -// fallback to tachyon for backward compatability +// fallback to tachyon for backward compatibility
[6/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI
http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/resources/org/apache/spark/ui/static/d3.min.js -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/d3.min.js b/core/src/main/resources/org/apache/spark/ui/static/d3.min.js new file mode 100644 index 000..30cd292 --- /dev/null +++ b/core/src/main/resources/org/apache/spark/ui/static/d3.min.js @@ -0,0 +1,5 @@ +/*v3.5.5*/!function(){function n(n){return n(n.ownerDocument||n.document||n).documentElement}function t(n){return n(n.ownerDocumentn.ownerDocument.defaultView||n.documentn||n.defaultView)}function e(n,t){return tn?-1:nt?1:n=t?0:0/0}function r(n){return null===n?0/0:+n}function u(n){return!isNaN(n)}function i(n){return{left:function(t,e,r,u){for(arguments.length3(r=0),arguments.length4(u=t.length);ur;){var i=r+u1;n(t[i],e)0?r=i+1:u=i}return r},right:function(t,e,r,u){for(arguments.length3(r=0),arguments.length4(u=t.length);ur;){var i=r+u1;n(t[i],e)0?u=i:r=i+1}return r}}}function o(n){return n.length}function a(n){for(var t=1;n*t%1;)t*=10;return t}function c(n,t){for(var e in t)Object.defineProperty(n.prototype,e,{value:t[e],enumerable:!1})}function l(){this._=Object.create(null)}function s(n){return(n+=)===pa||n[0]===va?va+n:n}function f(n){return(n+=)[0]===va?n.slice(1):n}function h(n){return s(n)in this._}function g(n){return(n=s(n))in this._d elete this._[n]}function p(){var n=[];for(var t in this._)n.push(f(t));return n}function v(){var n=0;for(var t in this._)++n;return n}function d(){for(var n in this._)return!1;return!0}function m(){this._=Object.create(null)}function y(n){return n}function M(n,t,e){return function(){var r=e.apply(t,arguments);return r===t?n:r}}function x(n,t){if(t in n)return t;t=t.charAt(0).toUpperCase()+t.slice(1);for(var e=0,r=da.length;re;++e){var u=da[e]+t;if(u in n)return u}}function b(){}function _(){}function w(n){function t(){for(var t,r=e,u=-1,i=r.length;++ui;)(t=r[u].on)t.apply(this,arguments);return n}var e=[],r=new l;return t.on=function(t,u){var i,o=r.get(t);return arguments.length2?oo.on:(o(o.on=null,e=e.slice(0,i=e.indexOf(o)).concat(e.slice(i+1)),r.remove(t)),ue.push(r.set(t,{on:u})),n)},t}function S(){ta.event.preventDefault()}function k(){for(var n,t=ta.event;n=t.sourceEvent;)t=n;return t}function E(n){for(var t=new _,e=0,r=arguments.length;++er;)t[arguments[e]]=w(t);r eturn t.of=function(e,r){return function(u){try{var i=u.sourceEvent=ta.event;u.target=n,ta.event=u,t[u.type].apply(e,r)}finally{ta.event=i}}},t}function A(n){return ya(n,_a),n}function N(n){returnfunction==typeof n?n:function(){return Ma(n,this)}}function C(n){returnfunction==typeof n?n:function(){return xa(n,this)}}function z(n,t){function e(){this.removeAttribute(n)}function r(){this.removeAttributeNS(n.space,n.local)}function u(){this.setAttribute(n,t)}function i(){this.setAttributeNS(n.space,n.local,t)}function o(){var e=t.apply(this,arguments);null==e?this.removeAttribute(n):this.setAttribute(n,e)}function a(){var e=t.apply(this,arguments);null==e?this.removeAttributeNS(n.space,n.local):this.setAttributeNS(n.space,n.local,e)}return n=ta.ns.qualify(n),null==t?n.local?r:e:function==typeof t?n.local?a:o:n.local?i:u}function q(n){return n.trim().replace(/\s+/g, )}function L(n){return new RegExp((?:^|\\s+)+ta.requote(n)+(?:\\s+|$),g)}function T(n){return(n+).trim().s plit(/^|\s+/)}function R(n,t){function e(){for(var e=-1;++eu;)n[e](this,t)}function r(){for(var e=-1,r=t.apply(this,arguments);++eu;)n[e](this,r)}n=T(n).map(D);var u=n.length;returnfunction==typeof t?r:e}function D(n){var t=L(n);return function(e,r){if(u=e.classList)return r?u.add(n):u.remove(n);var u=e.getAttribute(class)||;r?(t.lastIndex=0,t.test(u)||e.setAttribute(class,q(u+ +n))):e.setAttribute(class,q(u.replace(t, )))}}function P(n,t,e){function r(){this.style.removeProperty(n)}function u(){this.style.setProperty(n,t,e)}function i(){var r=t.apply(this,arguments);null==r?this.style.removeProperty(n):this.style.setProperty(n,r,e)}return null==t?r:function==typeof t?i:u}function U(n,t){function e(){delete this[n]}function r(){this[n]=t}function u(){var e=t.apply(this,arguments);null==e?delete this[n]:this[n]=e}return null==t?e:function==typeof t?u:r}function j(n){function t(){var t=this.ownerDocument,e=this.namespaceURI;return e?t.createElementNS(e,n):t.createE lement(n)}function e(){return this.ownerDocument.createElementNS(n.space,n.local)}returnfunction==typeof n?n:(n=ta.ns.qualify(n)).local?e:t}function F(){var n=this.parentNode;nn.removeChild(this)}function H(n){return{__data__:n}}function O(n){return function(){return ba(this,n)}}function I(n){return arguments.length||(n=e),function(t,e){return te?n(t.__data__,e.__data__):!t-!e}}function Y(n,t){for(var e=0,r=n.length;re;e++)for(var u,i=n[e],o=0,a=i.length;ao;o++)(u=i[o])t(u,o,e);return n}function Z(n){return ya(n,Sa),n}function V(n){var
[7/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI
[SPARK-6943] [SPARK-6944] DAG visualization on SparkUI This patch adds the functionality to display the RDD DAG on the SparkUI. This DAG describes the relationships between - an RDD and its dependencies, - an RDD and its operation scopes, and - an RDD's operation scopes and the stage / job hierarchy An operation scope here refers to the existing public APIs that created the RDDs (e.g. `textFile`, `treeAggregate`). In the future, we can expand this to include higher level operations like SQL queries. *Note: This blatantly stole a few lines of HTML and JavaScript from #5547 (thanks shroffpradyumn!)* Here's what the job page looks like: img src=https://issues.apache.org/jira/secure/attachment/12730286/job-page.png; width=700px/ and the stage page: img src=https://issues.apache.org/jira/secure/attachment/12730287/stage-page.png; width=300px/ Author: Andrew Or and...@databricks.com Closes #5729 from andrewor14/viz2 and squashes the following commits: 666c03b [Andrew Or] Round corners of RDD boxes on stage page (minor) 01ba336 [Andrew Or] Change RDD cache color to red (minor) 6f9574a [Andrew Or] Add tests for RDDOperationScope 1c310e4 [Andrew Or] Wrap a few more RDD functions in an operation scope 3ffe566 [Andrew Or] Restore null as default for RDD name 5fdd89d [Andrew Or] children - child (minor) 0d07a84 [Andrew Or] Fix python style afb98e2 [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz2 0d7aa32 [Andrew Or] Fix python tests 3459ab2 [Andrew Or] Fix tests 832443c [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz2 429e9e1 [Andrew Or] Display cached RDDs on the viz b1f0fd1 [Andrew Or] Rename OperatorScope - RDDOperationScope 31aae06 [Andrew Or] Extract visualization logic from listener 83f9c58 [Andrew Or] Implement a programmatic representation of operator scopes 5a7faf4 [Andrew Or] Rename references to viz scopes to viz clusters ee33d52 [Andrew Or] Separate HTML generating code from listener f9830a2 [Andrew Or] Refactor + clean up + document JS visualization code b80cc52 [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz2 0706992 [Andrew Or] Add link from jobs to stages deb48a0 [Andrew Or] Translate stage boxes taking into account the width 5c7ce16 [Andrew Or] Connect RDDs across stages + update style ab91416 [Andrew Or] Introduce visualization to the Job Page 5f07e9c [Andrew Or] Remove more return statements from scopes 5e388ea [Andrew Or] Fix line too long 43de96e [Andrew Or] Add parent IDs to StageInfo 6e2cfea [Andrew Or] Remove all return statements in `withScope` d19c4da [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz2 7ef957c [Andrew Or] Fix scala style 4310271 [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz2 aa868a9 [Andrew Or] Ensure that HadoopRDD is actually serializable c3bfcae [Andrew Or] Re-implement scopes using closures instead of annotations 52187fc [Andrew Or] Rat excludes 09d361e [Andrew Or] Add ID to node label (minor) 71281fa [Andrew Or] Embed the viz in the UI in a toggleable manner 8dd5af2 [Andrew Or] Fill in documentation + miscellaneous minor changes fe7816f [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz 205f838 [Andrew Or] Reimplement rendering with dagre-d3 instead of viz.js 5e22946 [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz 6a7cdca [Andrew Or] Move RDD scope util methods and logic to its own file 494d5c2 [Andrew Or] Revert a few unintended style changes 9fac6f3 [Andrew Or] Re-implement scopes through annotations instead f22f337 [Andrew Or] First working implementation of visualization with vis.js 2184348 [Andrew Or] Translate RDD information to dot file 5143523 [Andrew Or] Expose the necessary information in RDDInfo a9ed4f9 [Andrew Or] Add a few missing scopes to certain RDD methods 6b3403b [Andrew Or] Scope all RDD methods Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc8b5819 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc8b5819 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc8b5819 Branch: refs/heads/master Commit: fc8b58195afa67fbb75b4c8303e022f703cbf007 Parents: f32e69e Author: Andrew Or and...@databricks.com Authored: Mon May 4 16:21:36 2015 -0700 Committer: Andrew Or and...@databricks.com Committed: Mon May 4 16:21:36 2015 -0700 -- .rat-excludes | 3 + .../org/apache/spark/ui/static/d3.min.js| 5 + .../org/apache/spark/ui/static/dagre-d3.min.js | 29 ++ .../apache/spark/ui/static/graphlib-dot.min.js | 4 + .../org/apache/spark/ui/static/spark-dag-viz.js | 392 +++ .../org/apache/spark/ui/static/webui.css| 2 +- .../scala/org/apache/spark/SparkContext.scala | 97 +++-- .../org/apache/spark/rdd/AsyncRDDActions.scala | 10 +- .../apache/spark/rdd/DoubleRDDFunctions.scala
[2/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/rdd/RDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 31c07c7..7f7c7ed 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -25,7 +25,7 @@ import scala.language.implicitConversions import scala.reflect.{classTag, ClassTag} import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus -import org.apache.hadoop.io.{Writable, BytesWritable, NullWritable, Text} +import org.apache.hadoop.io.{BytesWritable, NullWritable, Text} import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.TextOutputFormat @@ -277,12 +277,20 @@ abstract class RDD[T: ClassTag]( if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context) } + /** + * Execute a block of code in a scope such that all new RDDs created in this body will + * be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}. + * + * Note: Return statements are NOT allowed in the given body. + */ + private[spark] def withScope[U](body: = U): U = RDDOperationScope.withScope[U](sc)(body) + // Transformations (return a new RDD) /** * Return a new RDD by applying a function to all elements of this RDD. */ - def map[U: ClassTag](f: T = U): RDD[U] = { + def map[U: ClassTag](f: T = U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) = iter.map(cleanF)) } @@ -291,7 +299,7 @@ abstract class RDD[T: ClassTag]( * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ - def flatMap[U: ClassTag](f: T = TraversableOnce[U]): RDD[U] = { + def flatMap[U: ClassTag](f: T = TraversableOnce[U]): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) = iter.flatMap(cleanF)) } @@ -299,7 +307,7 @@ abstract class RDD[T: ClassTag]( /** * Return a new RDD containing only the elements that satisfy a predicate. */ - def filter(f: T = Boolean): RDD[T] = { + def filter(f: T = Boolean): RDD[T] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[T, T]( this, @@ -310,13 +318,16 @@ abstract class RDD[T: ClassTag]( /** * Return a new RDD containing the distinct elements in this RDD. */ - def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = + def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { map(x = (x, null)).reduceByKey((x, y) = x, numPartitions).map(_._1) + } /** * Return a new RDD containing the distinct elements in this RDD. */ - def distinct(): RDD[T] = distinct(partitions.length) + def distinct(): RDD[T] = withScope { +distinct(partitions.length) + } /** * Return a new RDD that has exactly numPartitions partitions. @@ -327,7 +338,7 @@ abstract class RDD[T: ClassTag]( * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, * which can avoid performing a shuffle. */ - def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = { + def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) } @@ -352,7 +363,7 @@ abstract class RDD[T: ClassTag]( * data distributed using a hash partitioner. */ def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) - : RDD[T] = { + : RDD[T] = withScope { if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) = { @@ -377,16 +388,17 @@ abstract class RDD[T: ClassTag]( /** * Return a sampled subset of this RDD. - * + * * @param withReplacement can elements be sampled multiple times (replaced when sampled out) * @param fraction expected size of the sample as a fraction of this RDD's size * without replacement: probability that each element is chosen; fraction must be [0, 1] * with replacement: expected number of times each element is chosen; fraction must be = 0 * @param seed seed for the random number generator */ - def sample(withReplacement: Boolean, + def sample( + withReplacement: Boolean, fraction: Double, - seed: Long = Utils.random.nextLong): RDD[T] = { + seed: Long = Utils.random.nextLong): RDD[T] = withScope { require(fraction = 0.0, Negative fraction value: + fraction) if (withReplacement) { new PartitionwiseSampledRDD[T, T](this, new
[4/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/resources/org/apache/spark/ui/static/graphlib-dot.min.js -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/graphlib-dot.min.js b/core/src/main/resources/org/apache/spark/ui/static/graphlib-dot.min.js new file mode 100644 index 000..037316f --- /dev/null +++ b/core/src/main/resources/org/apache/spark/ui/static/graphlib-dot.min.js @@ -0,0 +1,4 @@ +/*v0.5.2*/(function e(t,n,r){function s(o,u){if(!n[o]){if(!t[o]){var a=typeof require==functionrequire;if(!ua)return a(o,!0);if(i)return i(o,!0);var f=new Error(Cannot find module '+o+');throw f.code=MODULE_NOT_FOUND,f}var l=n[o]={exports:{}};t[o][0].call(l.exports,function(e){var n=t[o][1][e];return s(n?n:e)},l,l.exports,e,t,n,r)}return n[o].exports}var i=typeof require==functionrequire;for(var o=0;or.length;o++)s(r[o]);return s})({1:[function(require,module,exports){(function(global){graphlibDot=require(./index);graphlibDot.graphlib=require(graphlib);global.graphlibDot=graphlibDot}).call(this,typeof global!==undefined?global:typeof self!==undefined?self:typeof window!==undefined?window:{})},{./index:2,graphlib:9}],2:[function(require,module,exports){var read=require(./lib/read-one),readMany=require(./lib/read-many),write=require(./lib/write-one),version=require(./lib/version);module.exports={read:read,readMany:readMany,write:write,version:versi on,type:dot,buffer:false}},{./lib/read-many:5,./lib/read-one:6,./lib/version:7,./lib/write-one:8}],3:[function(require,module,exports){use strict;var _=require(lodash),Graph=require(graphlib).Graph;module.exports=buildGraph;function buildGraph(parseTree){var isDirected=parseTree.type!==graph,isMultigraph=!parseTree.strict,defaultStack=[{node:{},edge:{}}],g=new Graph({directed:isDirected,multigraph:isMultigraph,compound:true});g.setGraph({});_.each(parseTree.stmts,function(stmt){handleStmt(g,stmt,defaultStack)});return g}function handleStmt(g,stmt,defaultStack,sg){switch(stmt.type){casenode:handleNodeStmt(g,stmt,defaultStack,sg);break;caseedge:handleEdgeStmt(g,stmt,defaultStack,sg);break;casesubgraph:handleSubgraphStmt(g,stmt,defaultStack,sg);break;caseattr:handleAttrStmt(g,stmt,defaultStack);break;caseinlineAttr:handleInlineAttrsStmt(g,stmt,defaultStack,sg);break}}function handleNodeStmt(g,stmt,defaultStack,sg){var v=stmt.id,attrs=stmt.attrs;maybeCreateNo de(g,v,defaultStack,sg);_.merge(g.node(v),attrs)}function handleEdgeStmt(g,stmt,defaultStack,sg){var attrs=stmt.attrs,prev,curr;_.each(stmt.elems,function(elem){handleStmt(g,elem,defaultStack,sg);switch(elem.type){casenode:curr=[elem.id];break;casesubgraph:curr=collectNodeIds(elem);break}_.each(prev,function(v){_.each(curr,function(w){var name;if(g.hasEdge(v,w)g.isMultigraph()){name=_.uniqueId(edge)}if(!g.hasEdge(v,w,name)){g.setEdge(v,w,_.clone(_.last(defaultStack).edge),name)}_.merge(g.edge(v,w,name),attrs)})});prev=curr})}function handleSubgraphStmt(g,stmt,defaultStack,sg){var id=stmt.id;if(id===undefined){id=generateSubgraphId(g)}defaultStack.push(_.clone(_.last(defaultStack)));maybeCreateNode(g,id,defaultStack,sg);_.each(stmt.stmts,function(s){handleStmt(g,s,defaultStack,id)});if(!g.children(id).length){g.removeNode(id)}defaultStack.pop()}function handleAttrStmt(g,stmt,defaultStack){_.merge(_.last(defaultStack)[stmt.attrType],stmt.attrs)}function handleInlineAttrsStmt(g ,stmt,defaultStack,sg){_.merge(sg?g.node(sg):g.graph(),stmt.attrs)}function generateSubgraphId(g){var id;do{id=_.uniqueId(sg)}while(g.hasNode(id));return id}function maybeCreateNode(g,v,defaultStack,sg){if(!g.hasNode(v)){g.setNode(v,_.clone(_.last(defaultStack).node));g.setParent(v,sg)}}function collectNodeIds(stmt){var ids={},stack=[],curr;var push=stack.push.bind(stack);push(stmt);while(stack.length){curr=stack.pop();switch(curr.type){casenode:ids[curr.id]=true;break;caseedge:_.each(curr.elems,push);break;casesubgraph:_.each(curr.stmts,push);break}}return _.keys(ids)}},{graphlib:9,lodash:28}],4:[function(require,module,exports){module.exports=function(){function peg$subclass(child,parent){function ctor(){this.constructor=child}ctor.prototype=parent.prototype;child.prototype=new ctor}function SyntaxError(message,expected,found,offset,line,column){this.message=message;this.expected=expected;this.found=found;this.offset=offset;this.line=line;this.column=column;this.name=Synt axError}peg$subclass(SyntaxError,Error);function parse(input){var
spark git commit: [SPARK-7243][SQL] Contingency Tables for DataFrames
Repository: spark Updated Branches: refs/heads/branch-1.4 863ec0cb4 - ecf0d8a9f [SPARK-7243][SQL] Contingency Tables for DataFrames Computes a pair-wise frequency table of the given columns. Also known as cross-tabulation. cc mengxr rxin Author: Burak Yavuz brk...@gmail.com Closes #5842 from brkyvz/df-cont and squashes the following commits: a07c01e [Burak Yavuz] addressed comments v4.1 ae9e01d [Burak Yavuz] fix test 9106585 [Burak Yavuz] addressed comments v4.0 bced829 [Burak Yavuz] fix merge conflicts a63ad00 [Burak Yavuz] addressed comments v3.0 a0cad97 [Burak Yavuz] addressed comments v3.0 6805df8 [Burak Yavuz] addressed comments and fixed test 939b7c4 [Burak Yavuz] lint python 7f098bc [Burak Yavuz] add crosstab pyTest fd53b00 [Burak Yavuz] added python support for crosstab 27a5a81 [Burak Yavuz] implemented crosstab (cherry picked from commit 80554111703c08e2bedbe303e04ecd162ec119e1) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ecf0d8a9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ecf0d8a9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ecf0d8a9 Branch: refs/heads/branch-1.4 Commit: ecf0d8a9f1eaf157433483090571ceaaee0b3f2e Parents: 863ec0c Author: Burak Yavuz brk...@gmail.com Authored: Mon May 4 17:02:49 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon May 4 17:03:03 2015 -0700 -- python/pyspark/sql/dataframe.py | 25 + python/pyspark/sql/tests.py | 9 .../spark/sql/DataFrameStatFunctions.scala | 37 + .../sql/execution/stat/StatFunctions.scala | 37 +++-- .../apache/spark/sql/JavaDataFrameSuite.java| 28 ++ .../apache/spark/sql/DataFrameStatSuite.scala | 55 +--- 6 files changed, 160 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ecf0d8a9/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 22762c5..f30a92d 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -931,6 +931,26 @@ class DataFrame(object): raise ValueError(col2 should be a string.) return self._jdf.stat().cov(col1, col2) +def crosstab(self, col1, col2): + +Computes a pair-wise frequency table of the given columns. Also known as a contingency +table. The number of distinct values for each column should be less than 1e4. The first +column of each row will be the distinct values of `col1` and the column names will be the +distinct values of `col2`. The name of the first column will be `$col1_$col2`. Pairs that +have no occurrences will have `null` as their counts. +:func:`DataFrame.crosstab` and :func:`DataFrameStatFunctions.crosstab` are aliases. + +:param col1: The name of the first column. Distinct items will make the first item of +each row. +:param col2: The name of the second column. Distinct items will make the column names +of the DataFrame. + +if not isinstance(col1, str): +raise ValueError(col1 should be a string.) +if not isinstance(col2, str): +raise ValueError(col2 should be a string.) +return DataFrame(self._jdf.stat().crosstab(col1, col2), self.sql_ctx) + def freqItems(self, cols, support=None): Finding frequent items for columns, possibly with false positives. Using the @@ -1423,6 +1443,11 @@ class DataFrameStatFunctions(object): cov.__doc__ = DataFrame.cov.__doc__ +def crosstab(self, col1, col2): +return self.df.crosstab(col1, col2) + +crosstab.__doc__ = DataFrame.crosstab.__doc__ + def freqItems(self, cols, support=None): return self.df.freqItems(cols, support) http://git-wip-us.apache.org/repos/asf/spark/blob/ecf0d8a9/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index d652c30..7ea6656 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -405,6 +405,15 @@ class SQLTests(ReusedPySparkTestCase): cov = df.stat.cov(a, b) self.assertTrue(abs(cov - 55.0 / 3) 1e-6) +def test_crosstab(self): +df = self.sc.parallelize([Row(a=i % 3, b=i % 2) for i in range(1, 7)]).toDF() +ct = df.stat.crosstab(a, b).collect() +ct = sorted(ct, key=lambda x: x[0]) +for i, row in enumerate(ct): +self.assertEqual(row[0], str(i)) +self.assertTrue(row[1], 1) +
spark git commit: [SPARK-7243][SQL] Contingency Tables for DataFrames
Repository: spark Updated Branches: refs/heads/master fc8b58195 - 805541117 [SPARK-7243][SQL] Contingency Tables for DataFrames Computes a pair-wise frequency table of the given columns. Also known as cross-tabulation. cc mengxr rxin Author: Burak Yavuz brk...@gmail.com Closes #5842 from brkyvz/df-cont and squashes the following commits: a07c01e [Burak Yavuz] addressed comments v4.1 ae9e01d [Burak Yavuz] fix test 9106585 [Burak Yavuz] addressed comments v4.0 bced829 [Burak Yavuz] fix merge conflicts a63ad00 [Burak Yavuz] addressed comments v3.0 a0cad97 [Burak Yavuz] addressed comments v3.0 6805df8 [Burak Yavuz] addressed comments and fixed test 939b7c4 [Burak Yavuz] lint python 7f098bc [Burak Yavuz] add crosstab pyTest fd53b00 [Burak Yavuz] added python support for crosstab 27a5a81 [Burak Yavuz] implemented crosstab Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80554111 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80554111 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80554111 Branch: refs/heads/master Commit: 80554111703c08e2bedbe303e04ecd162ec119e1 Parents: fc8b581 Author: Burak Yavuz brk...@gmail.com Authored: Mon May 4 17:02:49 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon May 4 17:02:49 2015 -0700 -- python/pyspark/sql/dataframe.py | 25 + python/pyspark/sql/tests.py | 9 .../spark/sql/DataFrameStatFunctions.scala | 37 + .../sql/execution/stat/StatFunctions.scala | 37 +++-- .../apache/spark/sql/JavaDataFrameSuite.java| 28 ++ .../apache/spark/sql/DataFrameStatSuite.scala | 55 +--- 6 files changed, 160 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/80554111/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 22762c5..f30a92d 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -931,6 +931,26 @@ class DataFrame(object): raise ValueError(col2 should be a string.) return self._jdf.stat().cov(col1, col2) +def crosstab(self, col1, col2): + +Computes a pair-wise frequency table of the given columns. Also known as a contingency +table. The number of distinct values for each column should be less than 1e4. The first +column of each row will be the distinct values of `col1` and the column names will be the +distinct values of `col2`. The name of the first column will be `$col1_$col2`. Pairs that +have no occurrences will have `null` as their counts. +:func:`DataFrame.crosstab` and :func:`DataFrameStatFunctions.crosstab` are aliases. + +:param col1: The name of the first column. Distinct items will make the first item of +each row. +:param col2: The name of the second column. Distinct items will make the column names +of the DataFrame. + +if not isinstance(col1, str): +raise ValueError(col1 should be a string.) +if not isinstance(col2, str): +raise ValueError(col2 should be a string.) +return DataFrame(self._jdf.stat().crosstab(col1, col2), self.sql_ctx) + def freqItems(self, cols, support=None): Finding frequent items for columns, possibly with false positives. Using the @@ -1423,6 +1443,11 @@ class DataFrameStatFunctions(object): cov.__doc__ = DataFrame.cov.__doc__ +def crosstab(self, col1, col2): +return self.df.crosstab(col1, col2) + +crosstab.__doc__ = DataFrame.crosstab.__doc__ + def freqItems(self, cols, support=None): return self.df.freqItems(cols, support) http://git-wip-us.apache.org/repos/asf/spark/blob/80554111/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index d652c30..7ea6656 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -405,6 +405,15 @@ class SQLTests(ReusedPySparkTestCase): cov = df.stat.cov(a, b) self.assertTrue(abs(cov - 55.0 / 3) 1e-6) +def test_crosstab(self): +df = self.sc.parallelize([Row(a=i % 3, b=i % 2) for i in range(1, 7)]).toDF() +ct = df.stat.crosstab(a, b).collect() +ct = sorted(ct, key=lambda x: x[0]) +for i, row in enumerate(ct): +self.assertEqual(row[0], str(i)) +self.assertTrue(row[1], 1) +self.assertTrue(row[2], 1) + def test_math_functions(self): df = self.sc.parallelize([Row(a=i, b=2 * i)
spark git commit: [SPARK-7319][SQL] Improve the output from DataFrame.show()
Repository: spark Updated Branches: refs/heads/branch-1.4 893b3103f - 34edaa8ac [SPARK-7319][SQL] Improve the output from DataFrame.show() Author: äºå³¤ chensong...@alibaba-inc.com Closes #5865 from kaka1992/df.show and squashes the following commits: c79204b [äºå³¤] Update a1338f6 [äºå³¤] Update python dataFrame show test and add empty df unit test. 734369c [äºå³¤] Update python dataFrame show test and add empty df unit test. 84aec3e [äºå³¤] Update python dataFrame show test and add empty df unit test. 159b3d5 [äºå³¤] update 03ef434 [äºå³¤] update 7394fd5 [äºå³¤] update test show ced487a [äºå³¤] update pep8 b6e690b [äºå³¤] Merge remote-tracking branch 'upstream/master' into df.show 30ac311 [äºå³¤] [SPARK-7294] ADD BETWEEN 7d62368 [äºå³¤] [SPARK-7294] ADD BETWEEN baf839b [äºå³¤] [SPARK-7294] ADD BETWEEN d11d5b9 [äºå³¤] [SPARK-7294] ADD BETWEEN (cherry picked from commit f32e69ecc333867fc966f65cd0aeaeddd43e0945) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34edaa8a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34edaa8a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34edaa8a Branch: refs/heads/branch-1.4 Commit: 34edaa8ac2334258961b290adb29d540233ee2bf Parents: 893b310 Author: äºå³¤ chensong...@alibaba-inc.com Authored: Mon May 4 12:08:38 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon May 4 13:24:52 2015 -0700 -- R/pkg/R/DataFrame.R | 2 +- R/pkg/inst/tests/test_sparkSQL.R| 2 +- python/pyspark/sql/dataframe.py | 105 --- .../scala/org/apache/spark/sql/DataFrame.scala | 28 +++-- .../org/apache/spark/sql/DataFrameSuite.scala | 19 5 files changed, 112 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/34edaa8a/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index b59b700..841e77e 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -167,7 +167,7 @@ setMethod(isLocal, setMethod(showDF, signature(x = DataFrame), function(x, numRows = 20) { -cat(callJMethod(x@sdf, showString, numToInt(numRows)), \n) +callJMethod(x@sdf, showString, numToInt(numRows)) }) #' show http://git-wip-us.apache.org/repos/asf/spark/blob/34edaa8a/R/pkg/inst/tests/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index af7a6c5..f82e56f 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -641,7 +641,7 @@ test_that(toJSON() returns an RDD of the correct values, { test_that(showDF(), { df - jsonFile(sqlCtx, jsonPath) - expect_output(showDF(df), age name \nnull Michael\n30 Andy \n19 Justin ) + expect_output(showDF(df), ++---+\n| age| name|\n++---+\n|null|Michael|\n| 30| Andy|\n| 19| Justin|\n++---+\n) }) test_that(isLocal(), { http://git-wip-us.apache.org/repos/asf/spark/blob/34edaa8a/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index aac5b8c..22762c5 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -275,9 +275,12 @@ class DataFrame(object): df DataFrame[age: int, name: string] df.show() -age name -2 Alice -5 Bob ++---+-+ +|age| name| ++---+-+ +| 2|Alice| +| 5| Bob| ++---+-+ print(self._jdf.showString(n)) @@ -591,12 +594,15 @@ class DataFrame(object): given, this function computes statistics for all numerical columns. df.describe().show() -summary age -count 2 -mean3.5 -stddev 1.5 -min 2 -max 5 ++---+---+ +|summary|age| ++---+---+ +| count| 2| +| mean|3.5| +| stddev|1.5| +|min| 2| +|max| 5| ++---+---+ jdf = self._jdf.describe(self._jseq(cols)) return DataFrame(jdf, self.sql_ctx) @@ -801,12 +807,18 @@ class DataFrame(object): :param subset: optional list of column names to consider. df4.dropna().show() -age height name -10 80 Alice ++---+--+-+ +|age|height| name| ++---+--+-+ +| 10|80|Alice| ++---+--+-+