spark git commit: [SPARK-5563] [MLLIB] LDA with online variational inference

2015-05-04 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 9646018bb - 3539cb7d2


[SPARK-5563] [MLLIB] LDA with online variational inference

JIRA: https://issues.apache.org/jira/browse/SPARK-5563
The PR contains the implementation for [Online LDA] 
(https://www.cs.princeton.edu/~blei/papers/HoffmanBleiBach2010b.pdf) based on 
the research of  Matt Hoffman and David M. Blei, which provides an efficient 
option for LDA users. Major advantages for the algorithm are the stream 
compatibility and economic time/memory consumption due to the corpus split. For 
more details, please refer to the jira.

Online LDA can act as a fast option for LDA, and will be especially helpful for 
the users who needs a quick result or with large corpus.

 Correctness test.
I have tested current PR with https://github.com/Blei-Lab/onlineldavb and the 
results are identical. I've uploaded the result and code to 
https://github.com/hhbyyh/LDACrossValidation.

Author: Yuhao Yang hhb...@gmail.com
Author: Joseph K. Bradley jos...@databricks.com

Closes #4419 from hhbyyh/ldaonline and squashes the following commits:

1045eec [Yuhao Yang] Merge pull request #2 from jkbradley/hhbyyh-ldaonline2
cf376ff [Joseph K. Bradley] For private vars needed for testing, I made them 
private and added accessors.  Java doesn’t understand package-private tags, 
so this minimizes the issues Java users might encounter.
6149ca6 [Yuhao Yang] fix for setOptimizer
cf0007d [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into 
ldaonline
54cf8da [Yuhao Yang] some style change
68c2318 [Yuhao Yang] add a java ut
4041723 [Yuhao Yang] add ut
138bfed [Yuhao Yang] Merge pull request #1 from 
jkbradley/hhbyyh-ldaonline-update
9e910d9 [Joseph K. Bradley] small fix
61d60df [Joseph K. Bradley] Minor cleanups: * Update *Concentration parameter 
documentation * EM Optimizer: createVertices() does not need to be a function * 
OnlineLDAOptimizer: typos in doc * Clean up the core code for online LDA (Scala 
style)
a996a82 [Yuhao Yang] respond to comments
b1178cf [Yuhao Yang] fit into the optimizer framework
dbe3cff [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into 
ldaonline
15be071 [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into 
ldaonline
b29193b [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into 
ldaonline
d19ef55 [Yuhao Yang] change OnlineLDA to class
97b9e1a [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into 
ldaonline
e7bf3b0 [Yuhao Yang] move to seperate file
f367cc9 [Yuhao Yang] change to optimization
8cb16a6 [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into 
ldaonline
62405cc [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into 
ldaonline
02d0373 [Yuhao Yang] fix style in comment
f6d47ca [Yuhao Yang] Merge branch 'ldaonline' of 
https://github.com/hhbyyh/spark into ldaonline
d86cdec [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into 
ldaonline
a570c9a [Yuhao Yang] use sample to pick up batch
4a3f27e [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into 
ldaonline
e271eb1 [Yuhao Yang] remove non ascii
581c623 [Yuhao Yang] seperate API and adjust batch split
37af91a [Yuhao Yang] iMerge remote-tracking branch 'upstream/master' into 
ldaonline
20328d1 [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into 
ldaonline i
aa365d1 [Yuhao Yang] merge upstream master
3a06526 [Yuhao Yang] merge with new example
0dd3947 [Yuhao Yang] kMerge remote-tracking branch 'upstream/master' into 
ldaonline
0d0f3ee [Yuhao Yang] replace random split with sliding
fa408a8 [Yuhao Yang] ssMerge remote-tracking branch 'upstream/master' into 
ldaonline
45884ab [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into 
ldaonline s
f41c5ca [Yuhao Yang] style fix
26dca1b [Yuhao Yang] style fix and make class private
043e786 [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into 
ldaonline s Conflicts: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
d640d9c [Yuhao Yang] online lda initial checkin


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3539cb7d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3539cb7d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3539cb7d

Branch: refs/heads/master
Commit: 3539cb7d20f5f878132407ec3b854011b183b2ad
Parents: 9646018
Author: Yuhao Yang hhb...@gmail.com
Authored: Mon May 4 00:06:25 2015 -0700
Committer: Joseph K. Bradley jos...@databricks.com
Committed: Mon May 4 00:06:25 2015 -0700

--
 .../org/apache/spark/mllib/clustering/LDA.scala |  65 ++--
 .../spark/mllib/clustering/LDAOptimizer.scala   | 320 +--
 .../spark/mllib/clustering/JavaLDASuite.java|  38 ++-
 .../spark/mllib/clustering/LDASuite.scala   |  89 +-
 4 files changed, 438 insertions(+), 74 deletions(-)

spark git commit: [SPARK-5100] [SQL] add webui for thriftserver

2015-05-04 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 3539cb7d2 - 343d3bfaf


[SPARK-5100] [SQL] add webui for thriftserver

This PR is a rebased version of #3946 , and mainly focused on creating an 
independent tab for the thrift server in spark web UI.

Features:

1. Session related statistics ( username and IP are only supported in 
hive-0.13.1 )
2. List all the SQL executing or executed on this server
3. Provide links to the job generated by SQL
4. Provide link to show all SQL executing or executed in a specified session

Prototype snapshots:

This is the main page for thrift server

![image](https://cloud.githubusercontent.com/assets/1411869/7361379/df7dcc64-ed89-11e4-9964-4df0b32f475e.png)

Author: tianyi tianyi.asiai...@gmail.com

Closes #5730 from tianyi/SPARK-5100 and squashes the following commits:

cfd14c7 [tianyi] style fix
0efe3d5 [tianyi] revert part of pom change
c0f2fa0 [tianyi] extends HiveThriftJdbcTest to start/stop thriftserver for UI 
test
aa20408 [tianyi] fix style problem
c9df6f9 [tianyi] add testsuite for thriftserver ui and fix some style issue
9830199 [tianyi] add webui for thriftserver


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/343d3bfa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/343d3bfa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/343d3bfa

Branch: refs/heads/master
Commit: 343d3bfafd449a0371feb6a88f78e07302fa7143
Parents: 3539cb7
Author: tianyi tianyi.asiai...@gmail.com
Authored: Mon May 4 16:59:34 2015 +0800
Committer: Cheng Lian l...@databricks.com
Committed: Mon May 4 16:59:34 2015 +0800

--
 .../scala/org/apache/spark/sql/SQLConf.scala|   2 +
 sql/hive-thriftserver/pom.xml   |  12 ++
 .../hive/thriftserver/HiveThriftServer2.scala   | 161 ++-
 .../hive/thriftserver/ui/ThriftServerPage.scala | 190 ++
 .../ui/ThriftServerSessionPage.scala| 197 +++
 .../hive/thriftserver/ui/ThriftServerTab.scala  |  50 +
 .../thriftserver/HiveThriftServer2Suites.scala  |  12 +-
 .../sql/hive/thriftserver/UISeleniumSuite.scala | 105 ++
 .../spark/sql/hive/thriftserver/Shim12.scala|  18 +-
 .../spark/sql/hive/thriftserver/Shim13.scala|  26 ++-
 10 files changed, 751 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/343d3bfa/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 2fa602a..99db959 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -52,6 +52,8 @@ private[spark] object SQLConf {
 
   // This is only used for the thriftserver
   val THRIFTSERVER_POOL = spark.sql.thriftserver.scheduler.pool
+  val THRIFTSERVER_UI_STATEMENT_LIMIT = 
spark.sql.thriftserver.ui.retainedStatements
+  val THRIFTSERVER_UI_SESSION_LIMIT = 
spark.sql.thriftserver.ui.retainedSessions
 
   // This is used to set the default data source
   val DEFAULT_DATA_SOURCE_NAME = spark.sql.sources.default

http://git-wip-us.apache.org/repos/asf/spark/blob/343d3bfa/sql/hive-thriftserver/pom.xml
--
diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml
index f38c796..437f697 100644
--- a/sql/hive-thriftserver/pom.xml
+++ b/sql/hive-thriftserver/pom.xml
@@ -57,6 +57,18 @@
   groupId${hive.group}/groupId
   artifactIdhive-beeline/artifactId
 /dependency
+!-- Added for selenium: --
+dependency
+  groupIdorg.seleniumhq.selenium/groupId
+  artifactIdselenium-java/artifactId
+  scopetest/scope
+  exclusions
+exclusion
+  groupIdio.netty/groupId
+  artifactIdnetty/artifactId
+/exclusion
+  /exclusions
+/dependency
   /dependencies
   build
 
outputDirectorytarget/scala-${scala.binary.version}/classes/outputDirectory

http://git-wip-us.apache.org/repos/asf/spark/blob/343d3bfa/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
--
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index 832596f..0be5a92 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -22,20 

Git Push Summary

2015-05-04 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/branch-1.4 [created] 343d3bfaf

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



Git Push Summary

2015-05-04 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/branch-1.4 [deleted] 343d3bfaf

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



Git Push Summary

2015-05-04 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/v1.2.2-rc1 [deleted] 7531b50e4

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



Git Push Summary

2015-05-04 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 [created] 343d3bfaf

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



Git Push Summary

2015-05-04 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/branch-1.4 [created] 343d3bfaf

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



Git Push Summary

2015-05-04 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/branch-1.4 [deleted] 343d3bfaf

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR] Fix python test typo?

2015-05-04 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master 343d3bfaf - 5a1a1075a


[MINOR] Fix python test typo?

I suspect haven't been using anaconda in tests in a while. I wonder if this 
change actually does anything but this line as it stands looks strictly less 
correct.

Author: Andrew Or and...@databricks.com

Closes #5883 from andrewor14/fix-run-tests-typo and squashes the following 
commits:

a3ad720 [Andrew Or] Fix typo?


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a1a1075
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a1a1075
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a1a1075

Branch: refs/heads/master
Commit: 5a1a1075a607be683f008ef92fa227803370c45f
Parents: 343d3bf
Author: Andrew Or and...@databricks.com
Authored: Mon May 4 17:17:55 2015 +0100
Committer: Patrick Wendell patr...@databricks.com
Committed: Mon May 4 17:17:55 2015 +0100

--
 dev/run-tests | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5a1a1075/dev/run-tests
--
diff --git a/dev/run-tests b/dev/run-tests
index 861d167..05c63bc 100755
--- a/dev/run-tests
+++ b/dev/run-tests
@@ -236,7 +236,7 @@ echo 
=
 CURRENT_BLOCK=$BLOCK_PYSPARK_UNIT_TESTS
 
 # add path for python 3 in jenkins
-export PATH=${PATH}:/home/anaonda/envs/py3k/bin
+export PATH=${PATH}:/home/anaconda/envs/py3k/bin
 ./python/run-tests
 
 echo 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[3/3] spark git commit: [SPARK-5956] [MLLIB] Pipeline components should be copyable.

2015-05-04 Thread meng
[SPARK-5956] [MLLIB] Pipeline components should be copyable.

This PR added `copy(extra: ParamMap): Params` to `Params`, which makes a copy 
of the current instance with a randomly generated uid and some extra param 
values. With this change, we only need to implement `fit` and `transform` 
without extra param values given the default implementation of `fit(dataset, 
extra)`:

~~~scala
def fit(dataset: DataFrame, extra: ParamMap): Model = {
  copy(extra).fit(dataset)
}
~~~

Inside `fit` and `transform`, since only the embedded values are used, I added 
`$` as an alias for `getOrDefault` to make the code easier to read. For 
example, in `LinearRegression.fit` we have:

~~~scala
val effectiveRegParam = $(regParam) / yStd
val effectiveL1RegParam = $(elasticNetParam) * effectiveRegParam
val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam
~~~

Meta-algorithm like `Pipeline` implements its own `copy(extra)`. So the fitted 
pipeline model stored all copied stages (no matter whether it is a transformer 
or a model).

Other changes:
* `Params$.inheritValues` is moved to `Params!.copyValues` and returns the 
target instance.
* `fittingParamMap` was removed because the `parent` carries this information.
* `validate` was renamed to `validateParams` to be more precise.

TODOs:
* [x] add tests for newly added methods
* [ ] update documentation

jkbradley dbtsai

Author: Xiangrui Meng m...@databricks.com

Closes #5820 from mengxr/SPARK-5956 and squashes the following commits:

7bef88d [Xiangrui Meng] address comments
05229c3 [Xiangrui Meng] assert - assertEquals
b2927b1 [Xiangrui Meng] organize imports
f14456b [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into 
SPARK-5956
93e7924 [Xiangrui Meng] add tests for hasParam  copy
463ecae [Xiangrui Meng] merge master
2b954c3 [Xiangrui Meng] update Binarizer
465dd12 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into 
SPARK-5956
282a1a8 [Xiangrui Meng] fix test
819dd2d [Xiangrui Meng] merge master
b642872 [Xiangrui Meng] example code runs
5a67779 [Xiangrui Meng] examples compile
c76b4d1 [Xiangrui Meng] fix all unit tests
0f4fd64 [Xiangrui Meng] fix some tests
9286a22 [Xiangrui Meng] copyValues to trained models
53e0973 [Xiangrui Meng] move inheritValues to Params and rename it to copyValues
9ee004e [Xiangrui Meng] merge copy and copyWith; rename validate to 
validateParams
d882afc [Xiangrui Meng] test compile
f082a31 [Xiangrui Meng] make Params copyable and simply handling of extra 
params in all spark.ml components


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0833c59
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0833c59
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0833c59

Branch: refs/heads/master
Commit: e0833c5958bbd73ff27cfe6865648d7b6e5a99bc
Parents: 5a1a107
Author: Xiangrui Meng m...@databricks.com
Authored: Mon May 4 11:28:59 2015 -0700
Committer: Xiangrui Meng m...@databricks.com
Committed: Mon May 4 11:28:59 2015 -0700

--
 .../examples/ml/JavaDeveloperApiExample.java|  24 ++---
 .../examples/ml/JavaSimpleParamsExample.java|   4 +-
 .../spark/examples/ml/DecisionTreeExample.scala |   6 +-
 .../spark/examples/ml/DeveloperApiExample.scala |  22 ++--
 .../apache/spark/examples/ml/GBTExample.scala   |   4 +-
 .../spark/examples/ml/RandomForestExample.scala |   6 +-
 .../spark/examples/ml/SimpleParamsExample.scala |   4 +-
 .../scala/org/apache/spark/ml/Estimator.scala   |  26 +++--
 .../scala/org/apache/spark/ml/Evaluator.scala   |  20 +++-
 .../main/scala/org/apache/spark/ml/Model.scala  |   9 +-
 .../scala/org/apache/spark/ml/Pipeline.scala| 106 +--
 .../scala/org/apache/spark/ml/Transformer.scala |  46 +---
 .../spark/ml/classification/Classifier.scala|  49 +++--
 .../classification/DecisionTreeClassifier.scala |  29 ++---
 .../spark/ml/classification/GBTClassifier.scala |  33 +++---
 .../ml/classification/LogisticRegression.scala  |  58 +-
 .../ProbabilisticClassifier.scala   |  33 ++
 .../classification/RandomForestClassifier.scala |  31 +++---
 .../BinaryClassificationEvaluator.scala |  17 ++-
 .../org/apache/spark/ml/feature/Binarizer.scala |  20 ++--
 .../org/apache/spark/ml/feature/HashingTF.scala |  10 +-
 .../scala/org/apache/spark/ml/feature/IDF.scala |  38 +++
 .../apache/spark/ml/feature/Normalizer.scala|  10 +-
 .../spark/ml/feature/PolynomialExpansion.scala  |   9 +-
 .../spark/ml/feature/StandardScaler.scala   |  49 -
 .../apache/spark/ml/feature/StringIndexer.scala |  34 +++---
 .../org/apache/spark/ml/feature/Tokenizer.scala |  18 ++--
 .../spark/ml/feature/VectorAssembler.scala  |  15 ++-
 .../apache/spark/ml/feature/VectorIndexer.scala |  74 ++---
 .../org/apache/spark/ml/feature/Word2Vec.scala  |  62 +--
 

[2/3] spark git commit: [SPARK-5956] [MLLIB] Pipeline components should be copyable.

2015-05-04 Thread meng
http://git-wip-us.apache.org/repos/asf/spark/blob/893b3103/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
index 9db3b29..3d78537 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
@@ -34,18 +34,17 @@ import org.apache.spark.util.collection.OpenHashMap
 private[feature] trait StringIndexerBase extends Params with HasInputCol with 
HasOutputCol {
 
   /** Validates and transforms the input schema. */
-  protected def validateAndTransformSchema(schema: StructType, paramMap: 
ParamMap): StructType = {
-val map = extractParamMap(paramMap)
-val inputColName = map(inputCol)
+  protected def validateAndTransformSchema(schema: StructType): StructType = {
+val inputColName = $(inputCol)
 val inputDataType = schema(inputColName).dataType
 require(inputDataType == StringType || 
inputDataType.isInstanceOf[NumericType],
   sThe input column $inputColName must be either string type or numeric 
type,  +
 sbut got $inputDataType.)
 val inputFields = schema.fields
-val outputColName = map(outputCol)
+val outputColName = $(outputCol)
 require(inputFields.forall(_.name != outputColName),
   sOutput column $outputColName already exists.)
-val attr = NominalAttribute.defaultAttr.withName(map(outputCol))
+val attr = NominalAttribute.defaultAttr.withName($(outputCol))
 val outputFields = inputFields :+ attr.toStructField()
 StructType(outputFields)
   }
@@ -69,19 +68,16 @@ class StringIndexer extends Estimator[StringIndexerModel] 
with StringIndexerBase
 
   // TODO: handle unseen labels
 
-  override def fit(dataset: DataFrame, paramMap: ParamMap): StringIndexerModel 
= {
-val map = extractParamMap(paramMap)
-val counts = dataset.select(col(map(inputCol)).cast(StringType))
+  override def fit(dataset: DataFrame): StringIndexerModel = {
+val counts = dataset.select(col($(inputCol)).cast(StringType))
   .map(_.getString(0))
   .countByValue()
 val labels = counts.toSeq.sortBy(-_._2).map(_._1).toArray
-val model = new StringIndexerModel(this, map, labels)
-Params.inheritValues(map, this, model)
-model
+copyValues(new StringIndexerModel(this, labels))
   }
 
-  override def transformSchema(schema: StructType, paramMap: ParamMap): 
StructType = {
-validateAndTransformSchema(schema, paramMap)
+  override def transformSchema(schema: StructType): StructType = {
+validateAndTransformSchema(schema)
   }
 }
 
@@ -92,7 +88,6 @@ class StringIndexer extends Estimator[StringIndexerModel] 
with StringIndexerBase
 @AlphaComponent
 class StringIndexerModel private[ml] (
 override val parent: StringIndexer,
-override val fittingParamMap: ParamMap,
 labels: Array[String]) extends Model[StringIndexerModel] with 
StringIndexerBase {
 
   private val labelToIndex: OpenHashMap[String, Double] = {
@@ -112,8 +107,7 @@ class StringIndexerModel private[ml] (
   /** @group setParam */
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
-  override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = {
-val map = extractParamMap(paramMap)
+  override def transform(dataset: DataFrame): DataFrame = {
 val indexer = udf { label: String =
   if (labelToIndex.contains(label)) {
 labelToIndex(label)
@@ -122,14 +116,14 @@ class StringIndexerModel private[ml] (
 throw new SparkException(sUnseen label: $label.)
   }
 }
-val outputColName = map(outputCol)
+val outputColName = $(outputCol)
 val metadata = NominalAttribute.defaultAttr
   .withName(outputColName).withValues(labels).toMetadata()
 dataset.select(col(*),
-  indexer(dataset(map(inputCol)).cast(StringType)).as(outputColName, 
metadata))
+  indexer(dataset($(inputCol)).cast(StringType)).as(outputColName, 
metadata))
   }
 
-  override def transformSchema(schema: StructType, paramMap: ParamMap): 
StructType = {
-validateAndTransformSchema(schema, paramMap)
+  override def transformSchema(schema: StructType): StructType = {
+validateAndTransformSchema(schema)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/893b3103/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala
index 01752ba..2863b76 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala
@@ -20,7 +20,7 @@ package org.apache.spark.ml.feature
 import 

[3/3] spark git commit: [SPARK-5956] [MLLIB] Pipeline components should be copyable.

2015-05-04 Thread meng
[SPARK-5956] [MLLIB] Pipeline components should be copyable.

This PR added `copy(extra: ParamMap): Params` to `Params`, which makes a copy 
of the current instance with a randomly generated uid and some extra param 
values. With this change, we only need to implement `fit` and `transform` 
without extra param values given the default implementation of `fit(dataset, 
extra)`:

~~~scala
def fit(dataset: DataFrame, extra: ParamMap): Model = {
  copy(extra).fit(dataset)
}
~~~

Inside `fit` and `transform`, since only the embedded values are used, I added 
`$` as an alias for `getOrDefault` to make the code easier to read. For 
example, in `LinearRegression.fit` we have:

~~~scala
val effectiveRegParam = $(regParam) / yStd
val effectiveL1RegParam = $(elasticNetParam) * effectiveRegParam
val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam
~~~

Meta-algorithm like `Pipeline` implements its own `copy(extra)`. So the fitted 
pipeline model stored all copied stages (no matter whether it is a transformer 
or a model).

Other changes:
* `Params$.inheritValues` is moved to `Params!.copyValues` and returns the 
target instance.
* `fittingParamMap` was removed because the `parent` carries this information.
* `validate` was renamed to `validateParams` to be more precise.

TODOs:
* [x] add tests for newly added methods
* [ ] update documentation

jkbradley dbtsai

Author: Xiangrui Meng m...@databricks.com

Closes #5820 from mengxr/SPARK-5956 and squashes the following commits:

7bef88d [Xiangrui Meng] address comments
05229c3 [Xiangrui Meng] assert - assertEquals
b2927b1 [Xiangrui Meng] organize imports
f14456b [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into 
SPARK-5956
93e7924 [Xiangrui Meng] add tests for hasParam  copy
463ecae [Xiangrui Meng] merge master
2b954c3 [Xiangrui Meng] update Binarizer
465dd12 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into 
SPARK-5956
282a1a8 [Xiangrui Meng] fix test
819dd2d [Xiangrui Meng] merge master
b642872 [Xiangrui Meng] example code runs
5a67779 [Xiangrui Meng] examples compile
c76b4d1 [Xiangrui Meng] fix all unit tests
0f4fd64 [Xiangrui Meng] fix some tests
9286a22 [Xiangrui Meng] copyValues to trained models
53e0973 [Xiangrui Meng] move inheritValues to Params and rename it to copyValues
9ee004e [Xiangrui Meng] merge copy and copyWith; rename validate to 
validateParams
d882afc [Xiangrui Meng] test compile
f082a31 [Xiangrui Meng] make Params copyable and simply handling of extra 
params in all spark.ml components

(cherry picked from commit e0833c5958bbd73ff27cfe6865648d7b6e5a99bc)
Signed-off-by: Xiangrui Meng m...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/893b3103
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/893b3103
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/893b3103

Branch: refs/heads/branch-1.4
Commit: 893b3103fef81b5e47ece599c3989c742e12f07c
Parents: 343d3bf
Author: Xiangrui Meng m...@databricks.com
Authored: Mon May 4 11:28:59 2015 -0700
Committer: Xiangrui Meng m...@databricks.com
Committed: Mon May 4 11:29:13 2015 -0700

--
 .../examples/ml/JavaDeveloperApiExample.java|  24 ++---
 .../examples/ml/JavaSimpleParamsExample.java|   4 +-
 .../spark/examples/ml/DecisionTreeExample.scala |   6 +-
 .../spark/examples/ml/DeveloperApiExample.scala |  22 ++--
 .../apache/spark/examples/ml/GBTExample.scala   |   4 +-
 .../spark/examples/ml/RandomForestExample.scala |   6 +-
 .../spark/examples/ml/SimpleParamsExample.scala |   4 +-
 .../scala/org/apache/spark/ml/Estimator.scala   |  26 +++--
 .../scala/org/apache/spark/ml/Evaluator.scala   |  20 +++-
 .../main/scala/org/apache/spark/ml/Model.scala  |   9 +-
 .../scala/org/apache/spark/ml/Pipeline.scala| 106 +--
 .../scala/org/apache/spark/ml/Transformer.scala |  46 +---
 .../spark/ml/classification/Classifier.scala|  49 +++--
 .../classification/DecisionTreeClassifier.scala |  29 ++---
 .../spark/ml/classification/GBTClassifier.scala |  33 +++---
 .../ml/classification/LogisticRegression.scala  |  58 +-
 .../ProbabilisticClassifier.scala   |  33 ++
 .../classification/RandomForestClassifier.scala |  31 +++---
 .../BinaryClassificationEvaluator.scala |  17 ++-
 .../org/apache/spark/ml/feature/Binarizer.scala |  20 ++--
 .../org/apache/spark/ml/feature/HashingTF.scala |  10 +-
 .../scala/org/apache/spark/ml/feature/IDF.scala |  38 +++
 .../apache/spark/ml/feature/Normalizer.scala|  10 +-
 .../spark/ml/feature/PolynomialExpansion.scala  |   9 +-
 .../spark/ml/feature/StandardScaler.scala   |  49 -
 .../apache/spark/ml/feature/StringIndexer.scala |  34 +++---
 .../org/apache/spark/ml/feature/Tokenizer.scala |  18 ++--
 .../spark/ml/feature/VectorAssembler.scala  |  15 ++-
 

[2/3] spark git commit: [SPARK-5956] [MLLIB] Pipeline components should be copyable.

2015-05-04 Thread meng
http://git-wip-us.apache.org/repos/asf/spark/blob/e0833c59/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
index 9db3b29..3d78537 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
@@ -34,18 +34,17 @@ import org.apache.spark.util.collection.OpenHashMap
 private[feature] trait StringIndexerBase extends Params with HasInputCol with 
HasOutputCol {
 
   /** Validates and transforms the input schema. */
-  protected def validateAndTransformSchema(schema: StructType, paramMap: 
ParamMap): StructType = {
-val map = extractParamMap(paramMap)
-val inputColName = map(inputCol)
+  protected def validateAndTransformSchema(schema: StructType): StructType = {
+val inputColName = $(inputCol)
 val inputDataType = schema(inputColName).dataType
 require(inputDataType == StringType || 
inputDataType.isInstanceOf[NumericType],
   sThe input column $inputColName must be either string type or numeric 
type,  +
 sbut got $inputDataType.)
 val inputFields = schema.fields
-val outputColName = map(outputCol)
+val outputColName = $(outputCol)
 require(inputFields.forall(_.name != outputColName),
   sOutput column $outputColName already exists.)
-val attr = NominalAttribute.defaultAttr.withName(map(outputCol))
+val attr = NominalAttribute.defaultAttr.withName($(outputCol))
 val outputFields = inputFields :+ attr.toStructField()
 StructType(outputFields)
   }
@@ -69,19 +68,16 @@ class StringIndexer extends Estimator[StringIndexerModel] 
with StringIndexerBase
 
   // TODO: handle unseen labels
 
-  override def fit(dataset: DataFrame, paramMap: ParamMap): StringIndexerModel 
= {
-val map = extractParamMap(paramMap)
-val counts = dataset.select(col(map(inputCol)).cast(StringType))
+  override def fit(dataset: DataFrame): StringIndexerModel = {
+val counts = dataset.select(col($(inputCol)).cast(StringType))
   .map(_.getString(0))
   .countByValue()
 val labels = counts.toSeq.sortBy(-_._2).map(_._1).toArray
-val model = new StringIndexerModel(this, map, labels)
-Params.inheritValues(map, this, model)
-model
+copyValues(new StringIndexerModel(this, labels))
   }
 
-  override def transformSchema(schema: StructType, paramMap: ParamMap): 
StructType = {
-validateAndTransformSchema(schema, paramMap)
+  override def transformSchema(schema: StructType): StructType = {
+validateAndTransformSchema(schema)
   }
 }
 
@@ -92,7 +88,6 @@ class StringIndexer extends Estimator[StringIndexerModel] 
with StringIndexerBase
 @AlphaComponent
 class StringIndexerModel private[ml] (
 override val parent: StringIndexer,
-override val fittingParamMap: ParamMap,
 labels: Array[String]) extends Model[StringIndexerModel] with 
StringIndexerBase {
 
   private val labelToIndex: OpenHashMap[String, Double] = {
@@ -112,8 +107,7 @@ class StringIndexerModel private[ml] (
   /** @group setParam */
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
-  override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = {
-val map = extractParamMap(paramMap)
+  override def transform(dataset: DataFrame): DataFrame = {
 val indexer = udf { label: String =
   if (labelToIndex.contains(label)) {
 labelToIndex(label)
@@ -122,14 +116,14 @@ class StringIndexerModel private[ml] (
 throw new SparkException(sUnseen label: $label.)
   }
 }
-val outputColName = map(outputCol)
+val outputColName = $(outputCol)
 val metadata = NominalAttribute.defaultAttr
   .withName(outputColName).withValues(labels).toMetadata()
 dataset.select(col(*),
-  indexer(dataset(map(inputCol)).cast(StringType)).as(outputColName, 
metadata))
+  indexer(dataset($(inputCol)).cast(StringType)).as(outputColName, 
metadata))
   }
 
-  override def transformSchema(schema: StructType, paramMap: ParamMap): 
StructType = {
-validateAndTransformSchema(schema, paramMap)
+  override def transformSchema(schema: StructType): StructType = {
+validateAndTransformSchema(schema)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e0833c59/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala
index 01752ba..2863b76 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala
@@ -20,7 +20,7 @@ package org.apache.spark.ml.feature
 import 

[1/3] spark git commit: [SPARK-5956] [MLLIB] Pipeline components should be copyable.

2015-05-04 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 5a1a1075a - e0833c595


http://git-wip-us.apache.org/repos/asf/spark/blob/e0833c59/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java
--
diff --git 
a/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java
 
b/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java
index 0cc36c8..a82b86d 100644
--- 
a/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java
+++ 
b/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java
@@ -23,14 +23,15 @@ import java.util.List;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import static org.junit.Assert.assertEquals;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import static org.apache.spark.mllib.classification.LogisticRegressionSuite
-.generateLogisticInputAsList;
 import org.apache.spark.mllib.regression.LabeledPoint;
 import org.apache.spark.sql.DataFrame;
 import org.apache.spark.sql.SQLContext;
+import static org.apache.spark.mllib.classification.LogisticRegressionSuite
+  .generateLogisticInputAsList;
 
 
 public class JavaLinearRegressionSuite implements Serializable {
@@ -65,8 +66,8 @@ public class JavaLinearRegressionSuite implements 
Serializable {
 DataFrame predictions = jsql.sql(SELECT label, prediction FROM 
prediction);
 predictions.collect();
 // Check defaults
-assert(model.getFeaturesCol().equals(features));
-assert(model.getPredictionCol().equals(prediction));
+assertEquals(features, model.getFeaturesCol());
+assertEquals(prediction, model.getPredictionCol());
   }
 
   @Test
@@ -76,14 +77,16 @@ public class JavaLinearRegressionSuite implements 
Serializable {
 .setMaxIter(10)
 .setRegParam(1.0);
 LinearRegressionModel model = lr.fit(dataset);
-assert(model.fittingParamMap().apply(lr.maxIter()).equals(10));
-assert(model.fittingParamMap().apply(lr.regParam()).equals(1.0));
+LinearRegression parent = model.parent();
+assertEquals(10, parent.getMaxIter());
+assertEquals(1.0, parent.getRegParam(), 0.0);
 
 // Call fit() with new params, and check as many params as we can.
 LinearRegressionModel model2 =
 lr.fit(dataset, lr.maxIter().w(5), lr.regParam().w(0.1), 
lr.predictionCol().w(thePred));
-assert(model2.fittingParamMap().apply(lr.maxIter()).equals(5));
-assert(model2.fittingParamMap().apply(lr.regParam()).equals(0.1));
-assert(model2.getPredictionCol().equals(thePred));
+LinearRegression parent2 = model2.parent();
+assertEquals(5, parent2.getMaxIter());
+assertEquals(0.1, parent2.getRegParam(), 0.0);
+assertEquals(thePred, model2.getPredictionCol());
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e0833c59/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java
--
diff --git 
a/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java 
b/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java
index 0bb6b48..08eeca5 100644
--- 
a/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java
+++ 
b/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java
@@ -68,8 +68,8 @@ public class JavaCrossValidatorSuite implements Serializable {
   .setEvaluator(eval)
   .setNumFolds(3);
 CrossValidatorModel cvModel = cv.fit(dataset);
-ParamMap bestParamMap = cvModel.bestModel().fittingParamMap();
-Assert.assertEquals(0.001, bestParamMap.apply(lr.regParam()));
-Assert.assertEquals(10, bestParamMap.apply(lr.maxIter()));
+LogisticRegression parent = (LogisticRegression) 
cvModel.bestModel().parent();
+Assert.assertEquals(0.001, parent.getRegParam(), 0.0);
+Assert.assertEquals(10, parent.getMaxIter());
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e0833c59/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala
--
diff --git a/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala
index 2f175fb..2b04a30 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala
@@ -42,30 +42,32 @@ class PipelineSuite extends FunSuite {
 val dataset3 = mock[DataFrame]
 val dataset4 = mock[DataFrame]
 
-when(estimator0.fit(meq(dataset0), any[ParamMap]())).thenReturn(model0)
-when(model0.transform(meq(dataset0), any[ParamMap]())).thenReturn(dataset1)
+when(estimator0.copy(any[ParamMap])).thenReturn(estimator0)
+when(model0.copy(any[ParamMap])).thenReturn(model0)
+

[1/3] spark git commit: [SPARK-5956] [MLLIB] Pipeline components should be copyable.

2015-05-04 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 343d3bfaf - 893b3103f


http://git-wip-us.apache.org/repos/asf/spark/blob/893b3103/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java
--
diff --git 
a/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java
 
b/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java
index 0cc36c8..a82b86d 100644
--- 
a/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java
+++ 
b/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java
@@ -23,14 +23,15 @@ import java.util.List;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import static org.junit.Assert.assertEquals;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import static org.apache.spark.mllib.classification.LogisticRegressionSuite
-.generateLogisticInputAsList;
 import org.apache.spark.mllib.regression.LabeledPoint;
 import org.apache.spark.sql.DataFrame;
 import org.apache.spark.sql.SQLContext;
+import static org.apache.spark.mllib.classification.LogisticRegressionSuite
+  .generateLogisticInputAsList;
 
 
 public class JavaLinearRegressionSuite implements Serializable {
@@ -65,8 +66,8 @@ public class JavaLinearRegressionSuite implements 
Serializable {
 DataFrame predictions = jsql.sql(SELECT label, prediction FROM 
prediction);
 predictions.collect();
 // Check defaults
-assert(model.getFeaturesCol().equals(features));
-assert(model.getPredictionCol().equals(prediction));
+assertEquals(features, model.getFeaturesCol());
+assertEquals(prediction, model.getPredictionCol());
   }
 
   @Test
@@ -76,14 +77,16 @@ public class JavaLinearRegressionSuite implements 
Serializable {
 .setMaxIter(10)
 .setRegParam(1.0);
 LinearRegressionModel model = lr.fit(dataset);
-assert(model.fittingParamMap().apply(lr.maxIter()).equals(10));
-assert(model.fittingParamMap().apply(lr.regParam()).equals(1.0));
+LinearRegression parent = model.parent();
+assertEquals(10, parent.getMaxIter());
+assertEquals(1.0, parent.getRegParam(), 0.0);
 
 // Call fit() with new params, and check as many params as we can.
 LinearRegressionModel model2 =
 lr.fit(dataset, lr.maxIter().w(5), lr.regParam().w(0.1), 
lr.predictionCol().w(thePred));
-assert(model2.fittingParamMap().apply(lr.maxIter()).equals(5));
-assert(model2.fittingParamMap().apply(lr.regParam()).equals(0.1));
-assert(model2.getPredictionCol().equals(thePred));
+LinearRegression parent2 = model2.parent();
+assertEquals(5, parent2.getMaxIter());
+assertEquals(0.1, parent2.getRegParam(), 0.0);
+assertEquals(thePred, model2.getPredictionCol());
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/893b3103/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java
--
diff --git 
a/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java 
b/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java
index 0bb6b48..08eeca5 100644
--- 
a/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java
+++ 
b/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java
@@ -68,8 +68,8 @@ public class JavaCrossValidatorSuite implements Serializable {
   .setEvaluator(eval)
   .setNumFolds(3);
 CrossValidatorModel cvModel = cv.fit(dataset);
-ParamMap bestParamMap = cvModel.bestModel().fittingParamMap();
-Assert.assertEquals(0.001, bestParamMap.apply(lr.regParam()));
-Assert.assertEquals(10, bestParamMap.apply(lr.maxIter()));
+LogisticRegression parent = (LogisticRegression) 
cvModel.bestModel().parent();
+Assert.assertEquals(0.001, parent.getRegParam(), 0.0);
+Assert.assertEquals(10, parent.getMaxIter());
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/893b3103/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala
--
diff --git a/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala
index 2f175fb..2b04a30 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala
@@ -42,30 +42,32 @@ class PipelineSuite extends FunSuite {
 val dataset3 = mock[DataFrame]
 val dataset4 = mock[DataFrame]
 
-when(estimator0.fit(meq(dataset0), any[ParamMap]())).thenReturn(model0)
-when(model0.transform(meq(dataset0), any[ParamMap]())).thenReturn(dataset1)
+when(estimator0.copy(any[ParamMap])).thenReturn(estimator0)
+when(model0.copy(any[ParamMap])).thenReturn(model0)
+

spark git commit: [SPARK-7319][SQL] Improve the output from DataFrame.show()

2015-05-04 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master e0833c595 - f32e69ecc


[SPARK-7319][SQL] Improve the output from DataFrame.show()

Author: 云峤 chensong...@alibaba-inc.com

Closes #5865 from kaka1992/df.show and squashes the following commits:

c79204b [云峤] Update
a1338f6 [云峤] Update python dataFrame show test and add empty df unit test.
734369c [云峤] Update python dataFrame show test and add empty df unit test.
84aec3e [云峤] Update python dataFrame show test and add empty df unit test.
159b3d5 [云峤] update
03ef434 [云峤] update
7394fd5 [云峤] update test show
ced487a [云峤] update pep8
b6e690b [云峤] Merge remote-tracking branch 'upstream/master' into df.show
30ac311 [云峤] [SPARK-7294] ADD BETWEEN
7d62368 [云峤] [SPARK-7294] ADD BETWEEN
baf839b [云峤] [SPARK-7294] ADD BETWEEN
d11d5b9 [云峤] [SPARK-7294] ADD BETWEEN


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f32e69ec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f32e69ec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f32e69ec

Branch: refs/heads/master
Commit: f32e69ecc333867fc966f65cd0aeaeddd43e0945
Parents: e0833c5
Author: 云峤 chensong...@alibaba-inc.com
Authored: Mon May 4 12:08:38 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Mon May 4 12:08:38 2015 -0700

--
 R/pkg/R/DataFrame.R |   2 +-
 R/pkg/inst/tests/test_sparkSQL.R|   2 +-
 python/pyspark/sql/dataframe.py | 105 ---
 .../scala/org/apache/spark/sql/DataFrame.scala  |  28 +++--
 .../org/apache/spark/sql/DataFrameSuite.scala   |  19 
 5 files changed, 112 insertions(+), 44 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f32e69ec/R/pkg/R/DataFrame.R
--
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index b59b700..841e77e 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -167,7 +167,7 @@ setMethod(isLocal,
 setMethod(showDF,
   signature(x = DataFrame),
   function(x, numRows = 20) {
-cat(callJMethod(x@sdf, showString, numToInt(numRows)), \n)
+callJMethod(x@sdf, showString, numToInt(numRows))
   })
 
 #' show

http://git-wip-us.apache.org/repos/asf/spark/blob/f32e69ec/R/pkg/inst/tests/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index af7a6c5..f82e56f 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -641,7 +641,7 @@ test_that(toJSON() returns an RDD of the correct values, {
 
 test_that(showDF(), {
   df - jsonFile(sqlCtx, jsonPath)
-  expect_output(showDF(df), age  name   \nnull Michael\n30   Andy   \n19   
Justin )
+  expect_output(showDF(df), ++---+\n| age|   
name|\n++---+\n|null|Michael|\n|  30|   Andy|\n|  19| 
Justin|\n++---+\n)
 })
 
 test_that(isLocal(), {

http://git-wip-us.apache.org/repos/asf/spark/blob/f32e69ec/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index aac5b8c..22762c5 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -275,9 +275,12 @@ class DataFrame(object):
  df
 DataFrame[age: int, name: string]
  df.show()
-age name
-2   Alice
-5   Bob
++---+-+
+|age| name|
++---+-+
+|  2|Alice|
+|  5|  Bob|
++---+-+
 
 print(self._jdf.showString(n))
 
@@ -591,12 +594,15 @@ class DataFrame(object):
 given, this function computes statistics for all numerical columns.
 
  df.describe().show()
-summary age
-count   2
-mean3.5
-stddev  1.5
-min 2
-max 5
++---+---+
+|summary|age|
++---+---+
+|  count|  2|
+|   mean|3.5|
+| stddev|1.5|
+|min|  2|
+|max|  5|
++---+---+
 
 jdf = self._jdf.describe(self._jseq(cols))
 return DataFrame(jdf, self.sql_ctx)
@@ -801,12 +807,18 @@ class DataFrame(object):
 :param subset: optional list of column names to consider.
 
  df4.dropna().show()
-age height name
-10  80 Alice
++---+--+-+
+|age|height| name|
++---+--+-+
+| 10|80|Alice|
++---+--+-+
 
  df4.na.drop().show()
-age height name
-10  80 Alice
++---+--+-+
+

spark git commit: [SPARK-7266] Add ExpectsInputTypes to expressions when possible.

2015-05-04 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 ecf0d8a9f - 1388a469b


[SPARK-7266] Add ExpectsInputTypes to expressions when possible.

This should gives us better analysis time error messages (rather than runtime) 
and automatic type casting.

Author: Reynold Xin r...@databricks.com

Closes #5796 from rxin/expected-input-types and squashes the following commits:

c900760 [Reynold Xin] [SPARK-7266] Add ExpectsInputTypes to expressions when 
possible.

(cherry picked from commit 678c4da0fa1bbfb6b5a0d3aced7aefa1bbbc193c)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1388a469
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1388a469
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1388a469

Branch: refs/heads/branch-1.4
Commit: 1388a469b178486eac8645e155aa9c94bae6a181
Parents: ecf0d8a
Author: Reynold Xin r...@databricks.com
Authored: Mon May 4 18:03:07 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Mon May 4 18:03:33 2015 -0700

--
 .../catalyst/analysis/HiveTypeCoercion.scala| 58 ++--
 .../sql/catalyst/expressions/Expression.scala   |  3 +-
 .../sql/catalyst/expressions/arithmetic.scala   |  4 +-
 .../sql/catalyst/expressions/predicates.scala   | 22 +---
 .../catalyst/expressions/stringOperations.scala | 40 --
 5 files changed, 71 insertions(+), 56 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1388a469/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
index 73c9a1c..831fb4f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
@@ -239,37 +239,43 @@ trait HiveTypeCoercion {
 a.makeCopy(Array(a.left, Cast(a.right, DoubleType)))
 
   // we should cast all timestamp/date/string compare into string compare
-  case p: BinaryPredicate if p.left.dataType == StringType
- p.right.dataType == DateType =
+  case p: BinaryComparison if p.left.dataType == StringType 
+  p.right.dataType == DateType =
 p.makeCopy(Array(p.left, Cast(p.right, StringType)))
-  case p: BinaryPredicate if p.left.dataType == DateType
- p.right.dataType == StringType =
+  case p: BinaryComparison if p.left.dataType == DateType 
+  p.right.dataType == StringType =
 p.makeCopy(Array(Cast(p.left, StringType), p.right))
-  case p: BinaryPredicate if p.left.dataType == StringType
- p.right.dataType == TimestampType =
+  case p: BinaryComparison if p.left.dataType == StringType 
+  p.right.dataType == TimestampType =
 p.makeCopy(Array(p.left, Cast(p.right, StringType)))
-  case p: BinaryPredicate if p.left.dataType == TimestampType
- p.right.dataType == StringType =
+  case p: BinaryComparison if p.left.dataType == TimestampType 
+  p.right.dataType == StringType =
 p.makeCopy(Array(Cast(p.left, StringType), p.right))
-  case p: BinaryPredicate if p.left.dataType == TimestampType
- p.right.dataType == DateType =
+  case p: BinaryComparison if p.left.dataType == TimestampType 
+  p.right.dataType == DateType =
 p.makeCopy(Array(Cast(p.left, StringType), Cast(p.right, StringType)))
-  case p: BinaryPredicate if p.left.dataType == DateType
- p.right.dataType == TimestampType =
+  case p: BinaryComparison if p.left.dataType == DateType 
+  p.right.dataType == TimestampType =
 p.makeCopy(Array(Cast(p.left, StringType), Cast(p.right, StringType)))
 
-  case p: BinaryPredicate if p.left.dataType == StringType  
p.right.dataType != StringType =
+  case p: BinaryComparison if p.left.dataType == StringType 
+  p.right.dataType != StringType =
 p.makeCopy(Array(Cast(p.left, DoubleType), p.right))
-  case p: BinaryPredicate if p.left.dataType != StringType  
p.right.dataType == StringType =
+  case p: BinaryComparison if p.left.dataType != StringType 
+  p.right.dataType == StringType =
 p.makeCopy(Array(p.left, Cast(p.right, DoubleType)))
 
-  case i @ In(a, b) if a.dataType == DateType  b.forall(_.dataType == 

spark git commit: [SPARK-7266] Add ExpectsInputTypes to expressions when possible.

2015-05-04 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 805541117 - 678c4da0f


[SPARK-7266] Add ExpectsInputTypes to expressions when possible.

This should gives us better analysis time error messages (rather than runtime) 
and automatic type casting.

Author: Reynold Xin r...@databricks.com

Closes #5796 from rxin/expected-input-types and squashes the following commits:

c900760 [Reynold Xin] [SPARK-7266] Add ExpectsInputTypes to expressions when 
possible.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/678c4da0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/678c4da0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/678c4da0

Branch: refs/heads/master
Commit: 678c4da0fa1bbfb6b5a0d3aced7aefa1bbbc193c
Parents: 8055411
Author: Reynold Xin r...@databricks.com
Authored: Mon May 4 18:03:07 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Mon May 4 18:03:07 2015 -0700

--
 .../catalyst/analysis/HiveTypeCoercion.scala| 58 ++--
 .../sql/catalyst/expressions/Expression.scala   |  3 +-
 .../sql/catalyst/expressions/arithmetic.scala   |  4 +-
 .../sql/catalyst/expressions/predicates.scala   | 22 +---
 .../catalyst/expressions/stringOperations.scala | 40 --
 5 files changed, 71 insertions(+), 56 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/678c4da0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
index 73c9a1c..831fb4f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
@@ -239,37 +239,43 @@ trait HiveTypeCoercion {
 a.makeCopy(Array(a.left, Cast(a.right, DoubleType)))
 
   // we should cast all timestamp/date/string compare into string compare
-  case p: BinaryPredicate if p.left.dataType == StringType
- p.right.dataType == DateType =
+  case p: BinaryComparison if p.left.dataType == StringType 
+  p.right.dataType == DateType =
 p.makeCopy(Array(p.left, Cast(p.right, StringType)))
-  case p: BinaryPredicate if p.left.dataType == DateType
- p.right.dataType == StringType =
+  case p: BinaryComparison if p.left.dataType == DateType 
+  p.right.dataType == StringType =
 p.makeCopy(Array(Cast(p.left, StringType), p.right))
-  case p: BinaryPredicate if p.left.dataType == StringType
- p.right.dataType == TimestampType =
+  case p: BinaryComparison if p.left.dataType == StringType 
+  p.right.dataType == TimestampType =
 p.makeCopy(Array(p.left, Cast(p.right, StringType)))
-  case p: BinaryPredicate if p.left.dataType == TimestampType
- p.right.dataType == StringType =
+  case p: BinaryComparison if p.left.dataType == TimestampType 
+  p.right.dataType == StringType =
 p.makeCopy(Array(Cast(p.left, StringType), p.right))
-  case p: BinaryPredicate if p.left.dataType == TimestampType
- p.right.dataType == DateType =
+  case p: BinaryComparison if p.left.dataType == TimestampType 
+  p.right.dataType == DateType =
 p.makeCopy(Array(Cast(p.left, StringType), Cast(p.right, StringType)))
-  case p: BinaryPredicate if p.left.dataType == DateType
- p.right.dataType == TimestampType =
+  case p: BinaryComparison if p.left.dataType == DateType 
+  p.right.dataType == TimestampType =
 p.makeCopy(Array(Cast(p.left, StringType), Cast(p.right, StringType)))
 
-  case p: BinaryPredicate if p.left.dataType == StringType  
p.right.dataType != StringType =
+  case p: BinaryComparison if p.left.dataType == StringType 
+  p.right.dataType != StringType =
 p.makeCopy(Array(Cast(p.left, DoubleType), p.right))
-  case p: BinaryPredicate if p.left.dataType != StringType  
p.right.dataType == StringType =
+  case p: BinaryComparison if p.left.dataType != StringType 
+  p.right.dataType == StringType =
 p.makeCopy(Array(p.left, Cast(p.right, DoubleType)))
 
-  case i @ In(a, b) if a.dataType == DateType  b.forall(_.dataType == 
StringType) =
+  case i @ In(a, b) if a.dataType == DateType 
+   b.forall(_.dataType == StringType) =

spark git commit: [SPARK-7236] [CORE] Fix to prevent AkkaUtils askWithReply from sleeping on final attempt

2015-05-04 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 678c4da0f - 8aa5aea7f


[SPARK-7236] [CORE] Fix to prevent AkkaUtils askWithReply from sleeping on 
final attempt

Added a check so that if `AkkaUtils.askWithReply` is on the final attempt, it 
will not sleep for the `retryInterval`.  This should also prevent the thread 
from sleeping for `Int.Max` when using `askWithReply` with default values for 
`maxAttempts` and `retryInterval`.

Author: Bryan Cutler bjcut...@us.ibm.com

Closes #5896 from BryanCutler/askWithReply-sleep-7236 and squashes the 
following commits:

653a07b [Bryan Cutler] [SPARK-7236] Fix to prevent AkkaUtils askWithReply from 
sleeping on final attempt


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8aa5aea7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8aa5aea7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8aa5aea7

Branch: refs/heads/master
Commit: 8aa5aea7fee0ae9cd34e16c30655ee02b8747455
Parents: 678c4da
Author: Bryan Cutler bjcut...@us.ibm.com
Authored: Mon May 4 18:29:22 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Mon May 4 18:29:22 2015 -0700

--
 core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8aa5aea7/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala 
b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index b725df3..de3316d 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -183,7 +183,9 @@ private[spark] object AkkaUtils extends Logging {
   lastException = e
   logWarning(sError sending message [message = $message] in $attempts 
attempts, e)
   }
-  Thread.sleep(retryInterval)
+  if (attempts  maxAttempts) {
+Thread.sleep(retryInterval)
+  }
 }
 
 throw new SparkException(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-7236] [CORE] Fix to prevent AkkaUtils askWithReply from sleeping on final attempt

2015-05-04 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 1388a469b - 48655d10e


[SPARK-7236] [CORE] Fix to prevent AkkaUtils askWithReply from sleeping on 
final attempt

Added a check so that if `AkkaUtils.askWithReply` is on the final attempt, it 
will not sleep for the `retryInterval`.  This should also prevent the thread 
from sleeping for `Int.Max` when using `askWithReply` with default values for 
`maxAttempts` and `retryInterval`.

Author: Bryan Cutler bjcut...@us.ibm.com

Closes #5896 from BryanCutler/askWithReply-sleep-7236 and squashes the 
following commits:

653a07b [Bryan Cutler] [SPARK-7236] Fix to prevent AkkaUtils askWithReply from 
sleeping on final attempt

(cherry picked from commit 8aa5aea7fee0ae9cd34e16c30655ee02b8747455)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/48655d10
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/48655d10
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/48655d10

Branch: refs/heads/branch-1.4
Commit: 48655d10ede9f286140ebe6d72be04b8ea9c3d85
Parents: 1388a46
Author: Bryan Cutler bjcut...@us.ibm.com
Authored: Mon May 4 18:29:22 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Mon May 4 18:30:02 2015 -0700

--
 core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/48655d10/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala 
b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index b725df3..de3316d 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -183,7 +183,9 @@ private[spark] object AkkaUtils extends Logging {
   lastException = e
   logWarning(sError sending message [message = $message] in $attempts 
attempts, e)
   }
-  Thread.sleep(retryInterval)
+  if (attempts  maxAttempts) {
+Thread.sleep(retryInterval)
+  }
 }
 
 throw new SparkException(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[6/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI

2015-05-04 Thread andrewor14
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/resources/org/apache/spark/ui/static/d3.min.js
--
diff --git a/core/src/main/resources/org/apache/spark/ui/static/d3.min.js 
b/core/src/main/resources/org/apache/spark/ui/static/d3.min.js
new file mode 100644
index 000..30cd292
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/ui/static/d3.min.js
@@ -0,0 +1,5 @@
+/*v3.5.5*/!function(){function n(n){return 
n(n.ownerDocument||n.document||n).documentElement}function t(n){return 
n(n.ownerDocumentn.ownerDocument.defaultView||n.documentn||n.defaultView)}function
 e(n,t){return tn?-1:nt?1:n=t?0:0/0}function r(n){return 
null===n?0/0:+n}function u(n){return!isNaN(n)}function 
i(n){return{left:function(t,e,r,u){for(arguments.length3(r=0),arguments.length4(u=t.length);ur;){var
 i=r+u1;n(t[i],e)0?r=i+1:u=i}return 
r},right:function(t,e,r,u){for(arguments.length3(r=0),arguments.length4(u=t.length);ur;){var
 i=r+u1;n(t[i],e)0?u=i:r=i+1}return r}}}function o(n){return 
n.length}function a(n){for(var t=1;n*t%1;)t*=10;return t}function 
c(n,t){for(var e in 
t)Object.defineProperty(n.prototype,e,{value:t[e],enumerable:!1})}function 
l(){this._=Object.create(null)}function 
s(n){return(n+=)===pa||n[0]===va?va+n:n}function 
f(n){return(n+=)[0]===va?n.slice(1):n}function h(n){return s(n)in 
this._}function g(n){return(n=s(n))in this._d
 elete this._[n]}function p(){var n=[];for(var t in this._)n.push(f(t));return 
n}function v(){var n=0;for(var t in this._)++n;return n}function d(){for(var n 
in this._)return!1;return!0}function m(){this._=Object.create(null)}function 
y(n){return n}function M(n,t,e){return function(){var 
r=e.apply(t,arguments);return r===t?n:r}}function x(n,t){if(t in n)return 
t;t=t.charAt(0).toUpperCase()+t.slice(1);for(var e=0,r=da.length;re;++e){var 
u=da[e]+t;if(u in n)return u}}function b(){}function _(){}function 
w(n){function t(){for(var 
t,r=e,u=-1,i=r.length;++ui;)(t=r[u].on)t.apply(this,arguments);return n}var 
e=[],r=new l;return t.on=function(t,u){var i,o=r.get(t);return 
arguments.length2?oo.on:(o(o.on=null,e=e.slice(0,i=e.indexOf(o)).concat(e.slice(i+1)),r.remove(t)),ue.push(r.set(t,{on:u})),n)},t}function
 S(){ta.event.preventDefault()}function k(){for(var 
n,t=ta.event;n=t.sourceEvent;)t=n;return t}function E(n){for(var t=new 
_,e=0,r=arguments.length;++er;)t[arguments[e]]=w(t);r
 eturn t.of=function(e,r){return function(u){try{var 
i=u.sourceEvent=ta.event;u.target=n,ta.event=u,t[u.type].apply(e,r)}finally{ta.event=i}}},t}function
 A(n){return ya(n,_a),n}function N(n){returnfunction==typeof 
n?n:function(){return Ma(n,this)}}function C(n){returnfunction==typeof 
n?n:function(){return xa(n,this)}}function z(n,t){function 
e(){this.removeAttribute(n)}function 
r(){this.removeAttributeNS(n.space,n.local)}function 
u(){this.setAttribute(n,t)}function 
i(){this.setAttributeNS(n.space,n.local,t)}function o(){var 
e=t.apply(this,arguments);null==e?this.removeAttribute(n):this.setAttribute(n,e)}function
 a(){var 
e=t.apply(this,arguments);null==e?this.removeAttributeNS(n.space,n.local):this.setAttributeNS(n.space,n.local,e)}return
 n=ta.ns.qualify(n),null==t?n.local?r:e:function==typeof 
t?n.local?a:o:n.local?i:u}function q(n){return n.trim().replace(/\s+/g, 
)}function L(n){return new 
RegExp((?:^|\\s+)+ta.requote(n)+(?:\\s+|$),g)}function 
T(n){return(n+).trim().s
 plit(/^|\s+/)}function R(n,t){function e(){for(var 
e=-1;++eu;)n[e](this,t)}function r(){for(var 
e=-1,r=t.apply(this,arguments);++eu;)n[e](this,r)}n=T(n).map(D);var 
u=n.length;returnfunction==typeof t?r:e}function D(n){var t=L(n);return 
function(e,r){if(u=e.classList)return r?u.add(n):u.remove(n);var 
u=e.getAttribute(class)||;r?(t.lastIndex=0,t.test(u)||e.setAttribute(class,q(u+
 +n))):e.setAttribute(class,q(u.replace(t, )))}}function P(n,t,e){function 
r(){this.style.removeProperty(n)}function 
u(){this.style.setProperty(n,t,e)}function i(){var 
r=t.apply(this,arguments);null==r?this.style.removeProperty(n):this.style.setProperty(n,r,e)}return
 null==t?r:function==typeof t?i:u}function U(n,t){function e(){delete 
this[n]}function r(){this[n]=t}function u(){var 
e=t.apply(this,arguments);null==e?delete this[n]:this[n]=e}return 
null==t?e:function==typeof t?u:r}function j(n){function t(){var 
t=this.ownerDocument,e=this.namespaceURI;return 
e?t.createElementNS(e,n):t.createE
 lement(n)}function e(){return 
this.ownerDocument.createElementNS(n.space,n.local)}returnfunction==typeof 
n?n:(n=ta.ns.qualify(n)).local?e:t}function F(){var 
n=this.parentNode;nn.removeChild(this)}function 
H(n){return{__data__:n}}function O(n){return function(){return 
ba(this,n)}}function I(n){return arguments.length||(n=e),function(t,e){return 
te?n(t.__data__,e.__data__):!t-!e}}function Y(n,t){for(var 
e=0,r=n.length;re;e++)for(var 
u,i=n[e],o=0,a=i.length;ao;o++)(u=i[o])t(u,o,e);return n}function 
Z(n){return ya(n,Sa),n}function V(n){var 

[5/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI

2015-05-04 Thread andrewor14
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js
--
diff --git a/core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js 
b/core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js
new file mode 100644
index 000..6d2da25
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js
@@ -0,0 +1,29 @@
+/*v0.4.3 with 1 additional commit (see 
http://github.com/andrewor14/dagre-d3)*/(function(f){if(typeof 
exports===objecttypeof module!==undefined){module.exports=f()}else 
if(typeof define===functiondefine.amd){define([],f)}else{var g;if(typeof 
window!==undefined){g=window}else if(typeof 
global!==undefined){g=global}else if(typeof 
self!==undefined){g=self}else{g=this}g.dagreD3=f()}})(function(){var 
define,module,exports;return function e(t,n,r){function 
s(o,u){if(!n[o]){if(!t[o]){var a=typeof 
require==functionrequire;if(!ua)return a(o,!0);if(i)return i(o,!0);var 
f=new Error(Cannot find module '+o+');throw f.code=MODULE_NOT_FOUND,f}var 
l=n[o]={exports:{}};t[o][0].call(l.exports,function(e){var n=t[o][1][e];return 
s(n?n:e)},l,l.exports,e,t,n,r)}return n[o].exports}var i=typeof 
require==functionrequire;for(var o=0;or.length;o++)s(r[o]);return 
s}({1:[function(require,module,exports){/**
+ * @license
+ * Copyright (c) 2012-2013 Chris Pettitt
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the Software), to 
deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED AS IS, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+module.exports={graphlib:require(./lib/graphlib),dagre:require(./lib/dagre),intersect:require(./lib/intersect),render:require(./lib/render),util:require(./lib/util),version:require(./lib/version)}},{./lib/dagre:8,./lib/graphlib:9,./lib/intersect:10,./lib/render:23,./lib/util:25,./lib/version:26}],2:[function(require,module,exports){var
 
util=require(./util);module.exports={default:normal,normal:normal,vee:vee,undirected:undirected};function
 normal(parent,id,edge,type){var 
marker=parent.append(marker).attr(id,id).attr(viewBox,0 0 10 
10).attr(refX,9).attr(refY,5).attr(markerUnits,strokeWidth).attr(markerWidth,8).attr(markerHeight,6).attr(orient,auto);var
 path=marker.append(path).attr(d,M 0 0 L 10 5 L 0 10 
z).style(stroke-width,1).style(stroke-dasharray,1,0);util.applyStyle(path,edge[type+Style])}function
 vee(parent,id,edge,type){var 
marker=parent.append(marker).attr(id,id).attr(viewBox,0 0 10 
10).attr(refX,9).a
 
ttr(refY,5).attr(markerUnits,strokeWidth).attr(markerWidth,8).attr(markerHeight,6).attr(orient,auto);var
 path=marker.append(path).attr(d,M 0 0 L 10 5 L 0 10 L 4 5 
z).style(stroke-width,1).style(stroke-dasharray,1,0);util.applyStyle(path,edge[type+Style])}function
 undirected(parent,id,edge,type){var 
marker=parent.append(marker).attr(id,id).attr(viewBox,0 0 10 
10).attr(refX,9).attr(refY,5).attr(markerUnits,strokeWidth).attr(markerWidth,8).attr(markerHeight,6).attr(orient,auto);var
 path=marker.append(path).attr(d,M 0 5 L 10 
5).style(stroke-width,1).style(stroke-dasharray,1,0);util.applyStyle(path,edge[type+Style])}},{./util:25}],3:[function(require,module,exports){var
 util=require(./util);module.exports=createClusters;function 
createClusters(selection,g){var clusters=g.nodes().filter(function(v){return 
util.isSubgraph(g,v)}),svgClusters=selection.selectAll(g.cluster).data(clusters,function(v){return
 v});svgClusters
 
.enter().append(g).attr(id,function(v){returncluster_+v.replace(/^cluster/,)}).attr(name,function(v){return
 
g.node(v).label}).attr(class,cluster).style(opacity,0).append(rect);util.applyTransition(svgClusters.exit(),g).style(opacity,0).remove();util.applyTransition(svgClusters,g).style(opacity,1);util.applyTransition(svgClusters.selectAll(rect),g).attr(width,function(v){return
 g.node(v).width}).attr(height,function(v){return 
g.node(v).height}).attr(x,function(v){var node=g.node(v);return 
node.x-node.width/2}).attr(y,function(v){var node=g.node(v);return 

[1/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI

2015-05-04 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master f32e69ecc - fc8b58195


http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index ee02fbd..3f162d1 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -28,10 +28,11 @@ import org.json4s.DefaultFormats
 import org.json4s.JsonDSL._
 import org.json4s.JsonAST._
 
+import org.apache.spark._
 import org.apache.spark.executor._
+import org.apache.spark.rdd.RDDOperationScope
 import org.apache.spark.scheduler._
 import org.apache.spark.storage._
-import org.apache.spark._
 
 /**
  * Serializes SparkListener events to/from JSON.  This protocol provides 
strong backwards-
@@ -228,6 +229,7 @@ private[spark] object JsonProtocol {
 
   def stageInfoToJson(stageInfo: StageInfo): JValue = {
 val rddInfo = JArray(stageInfo.rddInfos.map(rddInfoToJson).toList)
+val parentIds = JArray(stageInfo.parentIds.map(JInt(_)).toList)
 val submissionTime = 
stageInfo.submissionTime.map(JInt(_)).getOrElse(JNothing)
 val completionTime = 
stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing)
 val failureReason = 
stageInfo.failureReason.map(JString(_)).getOrElse(JNothing)
@@ -236,6 +238,7 @@ private[spark] object JsonProtocol {
 (Stage Name - stageInfo.name) ~
 (Number of Tasks - stageInfo.numTasks) ~
 (RDD Info - rddInfo) ~
+(Parent IDs - parentIds) ~
 (Details - stageInfo.details) ~
 (Submission Time - submissionTime) ~
 (Completion Time - completionTime) ~
@@ -368,8 +371,11 @@ private[spark] object JsonProtocol {
 
   def rddInfoToJson(rddInfo: RDDInfo): JValue = {
 val storageLevel = storageLevelToJson(rddInfo.storageLevel)
+val parentIds = JArray(rddInfo.parentIds.map(JInt(_)).toList)
 (RDD ID - rddInfo.id) ~
 (Name - rddInfo.name) ~
+(Scope - rddInfo.scope.map(_.toJson)) ~
+(Parent IDs - parentIds) ~
 (Storage Level - storageLevel) ~
 (Number of Partitions - rddInfo.numPartitions) ~
 (Number of Cached Partitions - rddInfo.numCachedPartitions) ~
@@ -519,7 +525,7 @@ private[spark] object JsonProtocol {
 // The Stage Infos field was added in Spark 1.2.0
 val stageInfos = Utils.jsonOption(json \ Stage Infos)
   .map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
-stageIds.map(id = new StageInfo(id, 0, unknown, 0, Seq.empty, 
unknown))
+stageIds.map(id = new StageInfo(id, 0, unknown, 0, Seq.empty, 
Seq.empty, unknown))
   }
 SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
   }
@@ -599,7 +605,10 @@ private[spark] object JsonProtocol {
 val attemptId = (json \ Stage Attempt ID).extractOpt[Int].getOrElse(0)
 val stageName = (json \ Stage Name).extract[String]
 val numTasks = (json \ Number of Tasks).extract[Int]
-val rddInfos = (json \ RDD 
Info).extract[List[JValue]].map(rddInfoFromJson(_))
+val rddInfos = (json \ RDD 
Info).extract[List[JValue]].map(rddInfoFromJson)
+val parentIds = Utils.jsonOption(json \ Parent IDs)
+  .map { l = l.extract[List[JValue]].map(_.extract[Int]) }
+  .getOrElse(Seq.empty)
 val details = (json \ Details).extractOpt[String].getOrElse()
 val submissionTime = Utils.jsonOption(json \ Submission 
Time).map(_.extract[Long])
 val completionTime = Utils.jsonOption(json \ Completion 
Time).map(_.extract[Long])
@@ -609,7 +618,8 @@ private[spark] object JsonProtocol {
   case None = Seq[AccumulableInfo]()
 }
 
-val stageInfo = new StageInfo(stageId, attemptId, stageName, numTasks, 
rddInfos, details)
+val stageInfo = new StageInfo(
+  stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details)
 stageInfo.submissionTime = submissionTime
 stageInfo.completionTime = completionTime
 stageInfo.failureReason = failureReason
@@ -785,16 +795,22 @@ private[spark] object JsonProtocol {
   def rddInfoFromJson(json: JValue): RDDInfo = {
 val rddId = (json \ RDD ID).extract[Int]
 val name = (json \ Name).extract[String]
+val scope = Utils.jsonOption(json \ Scope)
+  .map(_.extract[String])
+  .map(RDDOperationScope.fromJson)
+val parentIds = Utils.jsonOption(json \ Parent IDs)
+  .map { l = l.extract[List[JValue]].map(_.extract[Int]) }
+  .getOrElse(Seq.empty)
 val storageLevel = storageLevelFromJson(json \ Storage Level)
 val numPartitions = (json \ Number of Partitions).extract[Int]
 val numCachedPartitions = (json \ Number of Cached 
Partitions).extract[Int]
 val memSize = (json \ Memory Size).extract[Long]
-// fallback to tachyon for backward compatability
+// fallback to tachyon for backward compatibility
 val 

[4/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI

2015-05-04 Thread andrewor14
http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/resources/org/apache/spark/ui/static/graphlib-dot.min.js
--
diff --git 
a/core/src/main/resources/org/apache/spark/ui/static/graphlib-dot.min.js 
b/core/src/main/resources/org/apache/spark/ui/static/graphlib-dot.min.js
new file mode 100644
index 000..037316f
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/ui/static/graphlib-dot.min.js
@@ -0,0 +1,4 @@
+/*v0.5.2*/(function e(t,n,r){function s(o,u){if(!n[o]){if(!t[o]){var a=typeof 
require==functionrequire;if(!ua)return a(o,!0);if(i)return i(o,!0);var 
f=new Error(Cannot find module '+o+');throw f.code=MODULE_NOT_FOUND,f}var 
l=n[o]={exports:{}};t[o][0].call(l.exports,function(e){var n=t[o][1][e];return 
s(n?n:e)},l,l.exports,e,t,n,r)}return n[o].exports}var i=typeof 
require==functionrequire;for(var o=0;or.length;o++)s(r[o]);return 
s})({1:[function(require,module,exports){(function(global){graphlibDot=require(./index);graphlibDot.graphlib=require(graphlib);global.graphlibDot=graphlibDot}).call(this,typeof
 global!==undefined?global:typeof self!==undefined?self:typeof 
window!==undefined?window:{})},{./index:2,graphlib:9}],2:[function(require,module,exports){var
 
read=require(./lib/read-one),readMany=require(./lib/read-many),write=require(./lib/write-one),version=require(./lib/version);module.exports={read:read,readMany:readMany,write:write,version:versi
 
on,type:dot,buffer:false}},{./lib/read-many:5,./lib/read-one:6,./lib/version:7,./lib/write-one:8}],3:[function(require,module,exports){use
 strict;var 
_=require(lodash),Graph=require(graphlib).Graph;module.exports=buildGraph;function
 buildGraph(parseTree){var 
isDirected=parseTree.type!==graph,isMultigraph=!parseTree.strict,defaultStack=[{node:{},edge:{}}],g=new
 
Graph({directed:isDirected,multigraph:isMultigraph,compound:true});g.setGraph({});_.each(parseTree.stmts,function(stmt){handleStmt(g,stmt,defaultStack)});return
 g}function 
handleStmt(g,stmt,defaultStack,sg){switch(stmt.type){casenode:handleNodeStmt(g,stmt,defaultStack,sg);break;caseedge:handleEdgeStmt(g,stmt,defaultStack,sg);break;casesubgraph:handleSubgraphStmt(g,stmt,defaultStack,sg);break;caseattr:handleAttrStmt(g,stmt,defaultStack);break;caseinlineAttr:handleInlineAttrsStmt(g,stmt,defaultStack,sg);break}}function
 handleNodeStmt(g,stmt,defaultStack,sg){var 
v=stmt.id,attrs=stmt.attrs;maybeCreateNo
 de(g,v,defaultStack,sg);_.merge(g.node(v),attrs)}function 
handleEdgeStmt(g,stmt,defaultStack,sg){var 
attrs=stmt.attrs,prev,curr;_.each(stmt.elems,function(elem){handleStmt(g,elem,defaultStack,sg);switch(elem.type){casenode:curr=[elem.id];break;casesubgraph:curr=collectNodeIds(elem);break}_.each(prev,function(v){_.each(curr,function(w){var
 
name;if(g.hasEdge(v,w)g.isMultigraph()){name=_.uniqueId(edge)}if(!g.hasEdge(v,w,name)){g.setEdge(v,w,_.clone(_.last(defaultStack).edge),name)}_.merge(g.edge(v,w,name),attrs)})});prev=curr})}function
 handleSubgraphStmt(g,stmt,defaultStack,sg){var 
id=stmt.id;if(id===undefined){id=generateSubgraphId(g)}defaultStack.push(_.clone(_.last(defaultStack)));maybeCreateNode(g,id,defaultStack,sg);_.each(stmt.stmts,function(s){handleStmt(g,s,defaultStack,id)});if(!g.children(id).length){g.removeNode(id)}defaultStack.pop()}function
 
handleAttrStmt(g,stmt,defaultStack){_.merge(_.last(defaultStack)[stmt.attrType],stmt.attrs)}function
 handleInlineAttrsStmt(g
 ,stmt,defaultStack,sg){_.merge(sg?g.node(sg):g.graph(),stmt.attrs)}function 
generateSubgraphId(g){var id;do{id=_.uniqueId(sg)}while(g.hasNode(id));return 
id}function 
maybeCreateNode(g,v,defaultStack,sg){if(!g.hasNode(v)){g.setNode(v,_.clone(_.last(defaultStack).node));g.setParent(v,sg)}}function
 collectNodeIds(stmt){var ids={},stack=[],curr;var 
push=stack.push.bind(stack);push(stmt);while(stack.length){curr=stack.pop();switch(curr.type){casenode:ids[curr.id]=true;break;caseedge:_.each(curr.elems,push);break;casesubgraph:_.each(curr.stmts,push);break}}return
 
_.keys(ids)}},{graphlib:9,lodash:28}],4:[function(require,module,exports){module.exports=function(){function
 peg$subclass(child,parent){function 
ctor(){this.constructor=child}ctor.prototype=parent.prototype;child.prototype=new
 ctor}function 
SyntaxError(message,expected,found,offset,line,column){this.message=message;this.expected=expected;this.found=found;this.offset=offset;this.line=line;this.column=column;this.name=Synt
 axError}peg$subclass(SyntaxError,Error);function parse(input){var 

[5/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI

2015-05-04 Thread andrewor14
http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js
--
diff --git a/core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js 
b/core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js
new file mode 100644
index 000..6d2da25
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js
@@ -0,0 +1,29 @@
+/*v0.4.3 with 1 additional commit (see 
http://github.com/andrewor14/dagre-d3)*/(function(f){if(typeof 
exports===objecttypeof module!==undefined){module.exports=f()}else 
if(typeof define===functiondefine.amd){define([],f)}else{var g;if(typeof 
window!==undefined){g=window}else if(typeof 
global!==undefined){g=global}else if(typeof 
self!==undefined){g=self}else{g=this}g.dagreD3=f()}})(function(){var 
define,module,exports;return function e(t,n,r){function 
s(o,u){if(!n[o]){if(!t[o]){var a=typeof 
require==functionrequire;if(!ua)return a(o,!0);if(i)return i(o,!0);var 
f=new Error(Cannot find module '+o+');throw f.code=MODULE_NOT_FOUND,f}var 
l=n[o]={exports:{}};t[o][0].call(l.exports,function(e){var n=t[o][1][e];return 
s(n?n:e)},l,l.exports,e,t,n,r)}return n[o].exports}var i=typeof 
require==functionrequire;for(var o=0;or.length;o++)s(r[o]);return 
s}({1:[function(require,module,exports){/**
+ * @license
+ * Copyright (c) 2012-2013 Chris Pettitt
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the Software), to 
deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED AS IS, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+module.exports={graphlib:require(./lib/graphlib),dagre:require(./lib/dagre),intersect:require(./lib/intersect),render:require(./lib/render),util:require(./lib/util),version:require(./lib/version)}},{./lib/dagre:8,./lib/graphlib:9,./lib/intersect:10,./lib/render:23,./lib/util:25,./lib/version:26}],2:[function(require,module,exports){var
 
util=require(./util);module.exports={default:normal,normal:normal,vee:vee,undirected:undirected};function
 normal(parent,id,edge,type){var 
marker=parent.append(marker).attr(id,id).attr(viewBox,0 0 10 
10).attr(refX,9).attr(refY,5).attr(markerUnits,strokeWidth).attr(markerWidth,8).attr(markerHeight,6).attr(orient,auto);var
 path=marker.append(path).attr(d,M 0 0 L 10 5 L 0 10 
z).style(stroke-width,1).style(stroke-dasharray,1,0);util.applyStyle(path,edge[type+Style])}function
 vee(parent,id,edge,type){var 
marker=parent.append(marker).attr(id,id).attr(viewBox,0 0 10 
10).attr(refX,9).a
 
ttr(refY,5).attr(markerUnits,strokeWidth).attr(markerWidth,8).attr(markerHeight,6).attr(orient,auto);var
 path=marker.append(path).attr(d,M 0 0 L 10 5 L 0 10 L 4 5 
z).style(stroke-width,1).style(stroke-dasharray,1,0);util.applyStyle(path,edge[type+Style])}function
 undirected(parent,id,edge,type){var 
marker=parent.append(marker).attr(id,id).attr(viewBox,0 0 10 
10).attr(refX,9).attr(refY,5).attr(markerUnits,strokeWidth).attr(markerWidth,8).attr(markerHeight,6).attr(orient,auto);var
 path=marker.append(path).attr(d,M 0 5 L 10 
5).style(stroke-width,1).style(stroke-dasharray,1,0);util.applyStyle(path,edge[type+Style])}},{./util:25}],3:[function(require,module,exports){var
 util=require(./util);module.exports=createClusters;function 
createClusters(selection,g){var clusters=g.nodes().filter(function(v){return 
util.isSubgraph(g,v)}),svgClusters=selection.selectAll(g.cluster).data(clusters,function(v){return
 v});svgClusters
 
.enter().append(g).attr(id,function(v){returncluster_+v.replace(/^cluster/,)}).attr(name,function(v){return
 
g.node(v).label}).attr(class,cluster).style(opacity,0).append(rect);util.applyTransition(svgClusters.exit(),g).style(opacity,0).remove();util.applyTransition(svgClusters,g).style(opacity,1);util.applyTransition(svgClusters.selectAll(rect),g).attr(width,function(v){return
 g.node(v).width}).attr(height,function(v){return 
g.node(v).height}).attr(x,function(v){var node=g.node(v);return 
node.x-node.width/2}).attr(y,function(v){var node=g.node(v);return 

[7/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI

2015-05-04 Thread andrewor14
[SPARK-6943] [SPARK-6944] DAG visualization on SparkUI

This patch adds the functionality to display the RDD DAG on the SparkUI.

This DAG describes the relationships between
- an RDD and its dependencies,
- an RDD and its operation scopes, and
- an RDD's operation scopes and the stage / job hierarchy

An operation scope here refers to the existing public APIs that created the 
RDDs (e.g. `textFile`, `treeAggregate`). In the future, we can expand this to 
include higher level operations like SQL queries.

*Note: This blatantly stole a few lines of HTML and JavaScript from #5547 
(thanks shroffpradyumn!)*

Here's what the job page looks like:
img 
src=https://issues.apache.org/jira/secure/attachment/12730286/job-page.png; 
width=700px/
and the stage page:
img 
src=https://issues.apache.org/jira/secure/attachment/12730287/stage-page.png; 
width=300px/

Author: Andrew Or and...@databricks.com

Closes #5729 from andrewor14/viz2 and squashes the following commits:

666c03b [Andrew Or] Round corners of RDD boxes on stage page (minor)
01ba336 [Andrew Or] Change RDD cache color to red (minor)
6f9574a [Andrew Or] Add tests for RDDOperationScope
1c310e4 [Andrew Or] Wrap a few more RDD functions in an operation scope
3ffe566 [Andrew Or] Restore null as default for RDD name
5fdd89d [Andrew Or] children - child (minor)
0d07a84 [Andrew Or] Fix python style
afb98e2 [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz2
0d7aa32 [Andrew Or] Fix python tests
3459ab2 [Andrew Or] Fix tests
832443c [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz2
429e9e1 [Andrew Or] Display cached RDDs on the viz
b1f0fd1 [Andrew Or] Rename OperatorScope - RDDOperationScope
31aae06 [Andrew Or] Extract visualization logic from listener
83f9c58 [Andrew Or] Implement a programmatic representation of operator scopes
5a7faf4 [Andrew Or] Rename references to viz scopes to viz clusters
ee33d52 [Andrew Or] Separate HTML generating code from listener
f9830a2 [Andrew Or] Refactor + clean up + document JS visualization code
b80cc52 [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz2
0706992 [Andrew Or] Add link from jobs to stages
deb48a0 [Andrew Or] Translate stage boxes taking into account the width
5c7ce16 [Andrew Or] Connect RDDs across stages + update style
ab91416 [Andrew Or] Introduce visualization to the Job Page
5f07e9c [Andrew Or] Remove more return statements from scopes
5e388ea [Andrew Or] Fix line too long
43de96e [Andrew Or] Add parent IDs to StageInfo
6e2cfea [Andrew Or] Remove all return statements in `withScope`
d19c4da [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz2
7ef957c [Andrew Or] Fix scala style
4310271 [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz2
aa868a9 [Andrew Or] Ensure that HadoopRDD is actually serializable
c3bfcae [Andrew Or] Re-implement scopes using closures instead of annotations
52187fc [Andrew Or] Rat excludes
09d361e [Andrew Or] Add ID to node label (minor)
71281fa [Andrew Or] Embed the viz in the UI in a toggleable manner
8dd5af2 [Andrew Or] Fill in documentation + miscellaneous minor changes
fe7816f [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz
205f838 [Andrew Or] Reimplement rendering with dagre-d3 instead of viz.js
5e22946 [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz
6a7cdca [Andrew Or] Move RDD scope util methods and logic to its own file
494d5c2 [Andrew Or] Revert a few unintended style changes
9fac6f3 [Andrew Or] Re-implement scopes through annotations instead
f22f337 [Andrew Or] First working implementation of visualization with vis.js
2184348 [Andrew Or] Translate RDD information to dot file
5143523 [Andrew Or] Expose the necessary information in RDDInfo
a9ed4f9 [Andrew Or] Add a few missing scopes to certain RDD methods
6b3403b [Andrew Or] Scope all RDD methods


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/863ec0cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/863ec0cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/863ec0cb

Branch: refs/heads/branch-1.4
Commit: 863ec0cb4de7dc77987117b35454cf79e240b1e7
Parents: 34edaa8
Author: Andrew Or and...@databricks.com
Authored: Mon May 4 16:21:36 2015 -0700
Committer: Andrew Or and...@databricks.com
Committed: Mon May 4 16:24:35 2015 -0700

--
 .rat-excludes   |   3 +
 .../org/apache/spark/ui/static/d3.min.js|   5 +
 .../org/apache/spark/ui/static/dagre-d3.min.js  |  29 ++
 .../apache/spark/ui/static/graphlib-dot.min.js  |   4 +
 .../org/apache/spark/ui/static/spark-dag-viz.js | 392 +++
 .../org/apache/spark/ui/static/webui.css|   2 +-
 .../scala/org/apache/spark/SparkContext.scala   |  97 +++--
 .../org/apache/spark/rdd/AsyncRDDActions.scala  |  10 +-
 

[3/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI

2015-05-04 Thread andrewor14
http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
--
diff --git 
a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js 
b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
new file mode 100644
index 000..99b0294
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This file contains the logic to render the RDD DAG visualization in the UI.
+ *
+ * This DAG describes the relationships between
+ *   (1) an RDD and its dependencies,
+ *   (2) an RDD and its operation scopes, and
+ *   (3) an RDD's operation scopes and the stage / job hierarchy
+ *
+ * An operation scope is a general, named code block representing an operation
+ * that instantiates RDDs (e.g. filter, textFile, reduceByKey). An operation
+ * scope can be nested inside of other scopes if the corresponding RDD 
operation
+ * invokes other such operations (for more detail, see 
o.a.s.rdd.operationScope).
+ *
+ * A stage may include one or more operation scopes if the RDD operations are
+ * streamlined into one stage (e.g. rdd.map(...).filter(...).flatMap(...)).
+ * On the flip side, an operation scope may also include one or many stages,
+ * or even jobs if the RDD operation is higher level than Spark's scheduling
+ * primitives (e.g. take, any SQL query).
+ *
+ * In the visualization, an RDD is expressed as a node, and its dependencies
+ * as directed edges (from parent to child). operation scopes, stages, and
+ * jobs are expressed as clusters that may contain one or many nodes. These
+ * clusters may be nested inside of each other in the scenarios described
+ * above.
+ *
+ * The visualization is rendered in an SVG contained in div#dag-viz-graph,
+ * and its input data is expected to be populated in div#dag-viz-metadata
+ * by Spark's UI code. This is currently used only on the stage page and on
+ * the job page.
+ *
+ * This requires jQuery, d3, and dagre-d3. Note that we use a custom release
+ * of dagre-d3 (http://github.com/andrewor14/dagre-d3) for some specific
+ * functionality. For more detail, please track the changes in that project
+ * since it was forked (commit 101503833a8ce5fe369547f6addf3e71172ce10b).
+ */
+
+var VizConstants = {
+  rddColor: #44,
+  rddCachedColor: #FF,
+  rddOperationColor: #AADFFF,
+  stageColor: #FFDDEE,
+  clusterLabelColor: #88,
+  edgeColor: #44,
+  edgeWidth: 1.5px,
+  svgMarginX: 0,
+  svgMarginY: 20,
+  stageSep: 50,
+  graphPrefix: graph_,
+  nodePrefix: node_,
+  stagePrefix: stage_,
+  clusterPrefix: cluster_,
+  stageClusterPrefix: cluster_stage_
+};
+
+// Helper d3 accessors for the elements that contain our graph and its metadata
+function graphContainer() { return d3.select(#dag-viz-graph); }
+function metadataContainer() { return d3.select(#dag-viz-metadata); }
+
+/*
+ * Show or hide the RDD DAG visualization.
+ * The graph is only rendered the first time this is called.
+ */
+function toggleDagViz(forJob) {
+  var arrowSelector = .expand-dag-viz-arrow;
+  $(arrowSelector).toggleClass('arrow-closed');
+  $(arrowSelector).toggleClass('arrow-open');
+  var shouldShow = $(arrowSelector).hasClass(arrow-open);
+  if (shouldShow) {
+var shouldRender = graphContainer().select(svg).empty();
+if (shouldRender) {
+  renderDagViz(forJob);
+}
+graphContainer().style(display, block);
+  } else {
+// Save the graph for later so we don't have to render it again
+graphContainer().style(display, none);
+  }
+}
+
+/*
+ * Render the RDD DAG visualization.
+ *
+ * Input DOM hierarchy:
+ *   div#dag-viz-metadata 
+ *   div.stage-metadata 
+ *   div.[dot-file | incoming-edge | outgoing-edge]
+ *
+ * Output DOM hierarchy:
+ *   div#dag-viz-graph 
+ *   svg 
+ *   g#cluster_stage_[stageId]
+ *
+ * Note that the input metadata is populated by o.a.s.ui.UIUtils.showDagViz.
+ * Any changes in the input format here must be reflected there.
+ */
+function renderDagViz(forJob) {
+
+  // If there is not a dot file to render, fail fast and report 

[1/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI

2015-05-04 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 34edaa8ac - 863ec0cb4


http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index ee02fbd..3f162d1 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -28,10 +28,11 @@ import org.json4s.DefaultFormats
 import org.json4s.JsonDSL._
 import org.json4s.JsonAST._
 
+import org.apache.spark._
 import org.apache.spark.executor._
+import org.apache.spark.rdd.RDDOperationScope
 import org.apache.spark.scheduler._
 import org.apache.spark.storage._
-import org.apache.spark._
 
 /**
  * Serializes SparkListener events to/from JSON.  This protocol provides 
strong backwards-
@@ -228,6 +229,7 @@ private[spark] object JsonProtocol {
 
   def stageInfoToJson(stageInfo: StageInfo): JValue = {
 val rddInfo = JArray(stageInfo.rddInfos.map(rddInfoToJson).toList)
+val parentIds = JArray(stageInfo.parentIds.map(JInt(_)).toList)
 val submissionTime = 
stageInfo.submissionTime.map(JInt(_)).getOrElse(JNothing)
 val completionTime = 
stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing)
 val failureReason = 
stageInfo.failureReason.map(JString(_)).getOrElse(JNothing)
@@ -236,6 +238,7 @@ private[spark] object JsonProtocol {
 (Stage Name - stageInfo.name) ~
 (Number of Tasks - stageInfo.numTasks) ~
 (RDD Info - rddInfo) ~
+(Parent IDs - parentIds) ~
 (Details - stageInfo.details) ~
 (Submission Time - submissionTime) ~
 (Completion Time - completionTime) ~
@@ -368,8 +371,11 @@ private[spark] object JsonProtocol {
 
   def rddInfoToJson(rddInfo: RDDInfo): JValue = {
 val storageLevel = storageLevelToJson(rddInfo.storageLevel)
+val parentIds = JArray(rddInfo.parentIds.map(JInt(_)).toList)
 (RDD ID - rddInfo.id) ~
 (Name - rddInfo.name) ~
+(Scope - rddInfo.scope.map(_.toJson)) ~
+(Parent IDs - parentIds) ~
 (Storage Level - storageLevel) ~
 (Number of Partitions - rddInfo.numPartitions) ~
 (Number of Cached Partitions - rddInfo.numCachedPartitions) ~
@@ -519,7 +525,7 @@ private[spark] object JsonProtocol {
 // The Stage Infos field was added in Spark 1.2.0
 val stageInfos = Utils.jsonOption(json \ Stage Infos)
   .map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
-stageIds.map(id = new StageInfo(id, 0, unknown, 0, Seq.empty, 
unknown))
+stageIds.map(id = new StageInfo(id, 0, unknown, 0, Seq.empty, 
Seq.empty, unknown))
   }
 SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
   }
@@ -599,7 +605,10 @@ private[spark] object JsonProtocol {
 val attemptId = (json \ Stage Attempt ID).extractOpt[Int].getOrElse(0)
 val stageName = (json \ Stage Name).extract[String]
 val numTasks = (json \ Number of Tasks).extract[Int]
-val rddInfos = (json \ RDD 
Info).extract[List[JValue]].map(rddInfoFromJson(_))
+val rddInfos = (json \ RDD 
Info).extract[List[JValue]].map(rddInfoFromJson)
+val parentIds = Utils.jsonOption(json \ Parent IDs)
+  .map { l = l.extract[List[JValue]].map(_.extract[Int]) }
+  .getOrElse(Seq.empty)
 val details = (json \ Details).extractOpt[String].getOrElse()
 val submissionTime = Utils.jsonOption(json \ Submission 
Time).map(_.extract[Long])
 val completionTime = Utils.jsonOption(json \ Completion 
Time).map(_.extract[Long])
@@ -609,7 +618,8 @@ private[spark] object JsonProtocol {
   case None = Seq[AccumulableInfo]()
 }
 
-val stageInfo = new StageInfo(stageId, attemptId, stageName, numTasks, 
rddInfos, details)
+val stageInfo = new StageInfo(
+  stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details)
 stageInfo.submissionTime = submissionTime
 stageInfo.completionTime = completionTime
 stageInfo.failureReason = failureReason
@@ -785,16 +795,22 @@ private[spark] object JsonProtocol {
   def rddInfoFromJson(json: JValue): RDDInfo = {
 val rddId = (json \ RDD ID).extract[Int]
 val name = (json \ Name).extract[String]
+val scope = Utils.jsonOption(json \ Scope)
+  .map(_.extract[String])
+  .map(RDDOperationScope.fromJson)
+val parentIds = Utils.jsonOption(json \ Parent IDs)
+  .map { l = l.extract[List[JValue]].map(_.extract[Int]) }
+  .getOrElse(Seq.empty)
 val storageLevel = storageLevelFromJson(json \ Storage Level)
 val numPartitions = (json \ Number of Partitions).extract[Int]
 val numCachedPartitions = (json \ Number of Cached 
Partitions).extract[Int]
 val memSize = (json \ Memory Size).extract[Long]
-// fallback to tachyon for backward compatability
+// fallback to tachyon for backward compatibility
 

[6/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI

2015-05-04 Thread andrewor14
http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/resources/org/apache/spark/ui/static/d3.min.js
--
diff --git a/core/src/main/resources/org/apache/spark/ui/static/d3.min.js 
b/core/src/main/resources/org/apache/spark/ui/static/d3.min.js
new file mode 100644
index 000..30cd292
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/ui/static/d3.min.js
@@ -0,0 +1,5 @@
+/*v3.5.5*/!function(){function n(n){return 
n(n.ownerDocument||n.document||n).documentElement}function t(n){return 
n(n.ownerDocumentn.ownerDocument.defaultView||n.documentn||n.defaultView)}function
 e(n,t){return tn?-1:nt?1:n=t?0:0/0}function r(n){return 
null===n?0/0:+n}function u(n){return!isNaN(n)}function 
i(n){return{left:function(t,e,r,u){for(arguments.length3(r=0),arguments.length4(u=t.length);ur;){var
 i=r+u1;n(t[i],e)0?r=i+1:u=i}return 
r},right:function(t,e,r,u){for(arguments.length3(r=0),arguments.length4(u=t.length);ur;){var
 i=r+u1;n(t[i],e)0?u=i:r=i+1}return r}}}function o(n){return 
n.length}function a(n){for(var t=1;n*t%1;)t*=10;return t}function 
c(n,t){for(var e in 
t)Object.defineProperty(n.prototype,e,{value:t[e],enumerable:!1})}function 
l(){this._=Object.create(null)}function 
s(n){return(n+=)===pa||n[0]===va?va+n:n}function 
f(n){return(n+=)[0]===va?n.slice(1):n}function h(n){return s(n)in 
this._}function g(n){return(n=s(n))in this._d
 elete this._[n]}function p(){var n=[];for(var t in this._)n.push(f(t));return 
n}function v(){var n=0;for(var t in this._)++n;return n}function d(){for(var n 
in this._)return!1;return!0}function m(){this._=Object.create(null)}function 
y(n){return n}function M(n,t,e){return function(){var 
r=e.apply(t,arguments);return r===t?n:r}}function x(n,t){if(t in n)return 
t;t=t.charAt(0).toUpperCase()+t.slice(1);for(var e=0,r=da.length;re;++e){var 
u=da[e]+t;if(u in n)return u}}function b(){}function _(){}function 
w(n){function t(){for(var 
t,r=e,u=-1,i=r.length;++ui;)(t=r[u].on)t.apply(this,arguments);return n}var 
e=[],r=new l;return t.on=function(t,u){var i,o=r.get(t);return 
arguments.length2?oo.on:(o(o.on=null,e=e.slice(0,i=e.indexOf(o)).concat(e.slice(i+1)),r.remove(t)),ue.push(r.set(t,{on:u})),n)},t}function
 S(){ta.event.preventDefault()}function k(){for(var 
n,t=ta.event;n=t.sourceEvent;)t=n;return t}function E(n){for(var t=new 
_,e=0,r=arguments.length;++er;)t[arguments[e]]=w(t);r
 eturn t.of=function(e,r){return function(u){try{var 
i=u.sourceEvent=ta.event;u.target=n,ta.event=u,t[u.type].apply(e,r)}finally{ta.event=i}}},t}function
 A(n){return ya(n,_a),n}function N(n){returnfunction==typeof 
n?n:function(){return Ma(n,this)}}function C(n){returnfunction==typeof 
n?n:function(){return xa(n,this)}}function z(n,t){function 
e(){this.removeAttribute(n)}function 
r(){this.removeAttributeNS(n.space,n.local)}function 
u(){this.setAttribute(n,t)}function 
i(){this.setAttributeNS(n.space,n.local,t)}function o(){var 
e=t.apply(this,arguments);null==e?this.removeAttribute(n):this.setAttribute(n,e)}function
 a(){var 
e=t.apply(this,arguments);null==e?this.removeAttributeNS(n.space,n.local):this.setAttributeNS(n.space,n.local,e)}return
 n=ta.ns.qualify(n),null==t?n.local?r:e:function==typeof 
t?n.local?a:o:n.local?i:u}function q(n){return n.trim().replace(/\s+/g, 
)}function L(n){return new 
RegExp((?:^|\\s+)+ta.requote(n)+(?:\\s+|$),g)}function 
T(n){return(n+).trim().s
 plit(/^|\s+/)}function R(n,t){function e(){for(var 
e=-1;++eu;)n[e](this,t)}function r(){for(var 
e=-1,r=t.apply(this,arguments);++eu;)n[e](this,r)}n=T(n).map(D);var 
u=n.length;returnfunction==typeof t?r:e}function D(n){var t=L(n);return 
function(e,r){if(u=e.classList)return r?u.add(n):u.remove(n);var 
u=e.getAttribute(class)||;r?(t.lastIndex=0,t.test(u)||e.setAttribute(class,q(u+
 +n))):e.setAttribute(class,q(u.replace(t, )))}}function P(n,t,e){function 
r(){this.style.removeProperty(n)}function 
u(){this.style.setProperty(n,t,e)}function i(){var 
r=t.apply(this,arguments);null==r?this.style.removeProperty(n):this.style.setProperty(n,r,e)}return
 null==t?r:function==typeof t?i:u}function U(n,t){function e(){delete 
this[n]}function r(){this[n]=t}function u(){var 
e=t.apply(this,arguments);null==e?delete this[n]:this[n]=e}return 
null==t?e:function==typeof t?u:r}function j(n){function t(){var 
t=this.ownerDocument,e=this.namespaceURI;return 
e?t.createElementNS(e,n):t.createE
 lement(n)}function e(){return 
this.ownerDocument.createElementNS(n.space,n.local)}returnfunction==typeof 
n?n:(n=ta.ns.qualify(n)).local?e:t}function F(){var 
n=this.parentNode;nn.removeChild(this)}function 
H(n){return{__data__:n}}function O(n){return function(){return 
ba(this,n)}}function I(n){return arguments.length||(n=e),function(t,e){return 
te?n(t.__data__,e.__data__):!t-!e}}function Y(n,t){for(var 
e=0,r=n.length;re;e++)for(var 
u,i=n[e],o=0,a=i.length;ao;o++)(u=i[o])t(u,o,e);return n}function 
Z(n){return ya(n,Sa),n}function V(n){var 

[7/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI

2015-05-04 Thread andrewor14
[SPARK-6943] [SPARK-6944] DAG visualization on SparkUI

This patch adds the functionality to display the RDD DAG on the SparkUI.

This DAG describes the relationships between
- an RDD and its dependencies,
- an RDD and its operation scopes, and
- an RDD's operation scopes and the stage / job hierarchy

An operation scope here refers to the existing public APIs that created the 
RDDs (e.g. `textFile`, `treeAggregate`). In the future, we can expand this to 
include higher level operations like SQL queries.

*Note: This blatantly stole a few lines of HTML and JavaScript from #5547 
(thanks shroffpradyumn!)*

Here's what the job page looks like:
img 
src=https://issues.apache.org/jira/secure/attachment/12730286/job-page.png; 
width=700px/
and the stage page:
img 
src=https://issues.apache.org/jira/secure/attachment/12730287/stage-page.png; 
width=300px/

Author: Andrew Or and...@databricks.com

Closes #5729 from andrewor14/viz2 and squashes the following commits:

666c03b [Andrew Or] Round corners of RDD boxes on stage page (minor)
01ba336 [Andrew Or] Change RDD cache color to red (minor)
6f9574a [Andrew Or] Add tests for RDDOperationScope
1c310e4 [Andrew Or] Wrap a few more RDD functions in an operation scope
3ffe566 [Andrew Or] Restore null as default for RDD name
5fdd89d [Andrew Or] children - child (minor)
0d07a84 [Andrew Or] Fix python style
afb98e2 [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz2
0d7aa32 [Andrew Or] Fix python tests
3459ab2 [Andrew Or] Fix tests
832443c [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz2
429e9e1 [Andrew Or] Display cached RDDs on the viz
b1f0fd1 [Andrew Or] Rename OperatorScope - RDDOperationScope
31aae06 [Andrew Or] Extract visualization logic from listener
83f9c58 [Andrew Or] Implement a programmatic representation of operator scopes
5a7faf4 [Andrew Or] Rename references to viz scopes to viz clusters
ee33d52 [Andrew Or] Separate HTML generating code from listener
f9830a2 [Andrew Or] Refactor + clean up + document JS visualization code
b80cc52 [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz2
0706992 [Andrew Or] Add link from jobs to stages
deb48a0 [Andrew Or] Translate stage boxes taking into account the width
5c7ce16 [Andrew Or] Connect RDDs across stages + update style
ab91416 [Andrew Or] Introduce visualization to the Job Page
5f07e9c [Andrew Or] Remove more return statements from scopes
5e388ea [Andrew Or] Fix line too long
43de96e [Andrew Or] Add parent IDs to StageInfo
6e2cfea [Andrew Or] Remove all return statements in `withScope`
d19c4da [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz2
7ef957c [Andrew Or] Fix scala style
4310271 [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz2
aa868a9 [Andrew Or] Ensure that HadoopRDD is actually serializable
c3bfcae [Andrew Or] Re-implement scopes using closures instead of annotations
52187fc [Andrew Or] Rat excludes
09d361e [Andrew Or] Add ID to node label (minor)
71281fa [Andrew Or] Embed the viz in the UI in a toggleable manner
8dd5af2 [Andrew Or] Fill in documentation + miscellaneous minor changes
fe7816f [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz
205f838 [Andrew Or] Reimplement rendering with dagre-d3 instead of viz.js
5e22946 [Andrew Or] Merge branch 'master' of github.com:apache/spark into viz
6a7cdca [Andrew Or] Move RDD scope util methods and logic to its own file
494d5c2 [Andrew Or] Revert a few unintended style changes
9fac6f3 [Andrew Or] Re-implement scopes through annotations instead
f22f337 [Andrew Or] First working implementation of visualization with vis.js
2184348 [Andrew Or] Translate RDD information to dot file
5143523 [Andrew Or] Expose the necessary information in RDDInfo
a9ed4f9 [Andrew Or] Add a few missing scopes to certain RDD methods
6b3403b [Andrew Or] Scope all RDD methods


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc8b5819
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc8b5819
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc8b5819

Branch: refs/heads/master
Commit: fc8b58195afa67fbb75b4c8303e022f703cbf007
Parents: f32e69e
Author: Andrew Or and...@databricks.com
Authored: Mon May 4 16:21:36 2015 -0700
Committer: Andrew Or and...@databricks.com
Committed: Mon May 4 16:21:36 2015 -0700

--
 .rat-excludes   |   3 +
 .../org/apache/spark/ui/static/d3.min.js|   5 +
 .../org/apache/spark/ui/static/dagre-d3.min.js  |  29 ++
 .../apache/spark/ui/static/graphlib-dot.min.js  |   4 +
 .../org/apache/spark/ui/static/spark-dag-viz.js | 392 +++
 .../org/apache/spark/ui/static/webui.css|   2 +-
 .../scala/org/apache/spark/SparkContext.scala   |  97 +++--
 .../org/apache/spark/rdd/AsyncRDDActions.scala  |  10 +-
 .../apache/spark/rdd/DoubleRDDFunctions.scala 

[2/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI

2015-05-04 Thread andrewor14
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/rdd/RDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 31c07c7..7f7c7ed 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -25,7 +25,7 @@ import scala.language.implicitConversions
 import scala.reflect.{classTag, ClassTag}
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
-import org.apache.hadoop.io.{Writable, BytesWritable, NullWritable, Text}
+import org.apache.hadoop.io.{BytesWritable, NullWritable, Text}
 import org.apache.hadoop.io.compress.CompressionCodec
 import org.apache.hadoop.mapred.TextOutputFormat
 
@@ -277,12 +277,20 @@ abstract class RDD[T: ClassTag](
 if (isCheckpointed) firstParent[T].iterator(split, context) else 
compute(split, context)
   }
 
+  /**
+   * Execute a block of code in a scope such that all new RDDs created in this 
body will
+   * be part of the same scope. For more detail, see 
{{org.apache.spark.rdd.RDDOperationScope}}.
+   *
+   * Note: Return statements are NOT allowed in the given body.
+   */
+  private[spark] def withScope[U](body: = U): U = 
RDDOperationScope.withScope[U](sc)(body)
+
   // Transformations (return a new RDD)
 
   /**
* Return a new RDD by applying a function to all elements of this RDD.
*/
-  def map[U: ClassTag](f: T = U): RDD[U] = {
+  def map[U: ClassTag](f: T = U): RDD[U] = withScope {
 val cleanF = sc.clean(f)
 new MapPartitionsRDD[U, T](this, (context, pid, iter) = iter.map(cleanF))
   }
@@ -291,7 +299,7 @@ abstract class RDD[T: ClassTag](
*  Return a new RDD by first applying a function to all elements of this
*  RDD, and then flattening the results.
*/
-  def flatMap[U: ClassTag](f: T = TraversableOnce[U]): RDD[U] = {
+  def flatMap[U: ClassTag](f: T = TraversableOnce[U]): RDD[U] = withScope {
 val cleanF = sc.clean(f)
 new MapPartitionsRDD[U, T](this, (context, pid, iter) = 
iter.flatMap(cleanF))
   }
@@ -299,7 +307,7 @@ abstract class RDD[T: ClassTag](
   /**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
-  def filter(f: T = Boolean): RDD[T] = {
+  def filter(f: T = Boolean): RDD[T] = withScope {
 val cleanF = sc.clean(f)
 new MapPartitionsRDD[T, T](
   this,
@@ -310,13 +318,16 @@ abstract class RDD[T: ClassTag](
   /**
* Return a new RDD containing the distinct elements in this RDD.
*/
-  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
+  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = 
withScope {
 map(x = (x, null)).reduceByKey((x, y) = x, numPartitions).map(_._1)
+  }
 
   /**
* Return a new RDD containing the distinct elements in this RDD.
*/
-  def distinct(): RDD[T] = distinct(partitions.length)
+  def distinct(): RDD[T] = withScope {
+distinct(partitions.length)
+  }
 
   /**
* Return a new RDD that has exactly numPartitions partitions.
@@ -327,7 +338,7 @@ abstract class RDD[T: ClassTag](
* If you are decreasing the number of partitions in this RDD, consider 
using `coalesce`,
* which can avoid performing a shuffle.
*/
-  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = {
+  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
 coalesce(numPartitions, shuffle = true)
   }
 
@@ -352,7 +363,7 @@ abstract class RDD[T: ClassTag](
* data distributed using a hash partitioner.
*/
   def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: 
Ordering[T] = null)
-  : RDD[T] = {
+  : RDD[T] = withScope {
 if (shuffle) {
   /** Distributes elements evenly across output partitions, starting from 
a random partition. */
   val distributePartition = (index: Int, items: Iterator[T]) = {
@@ -377,16 +388,17 @@ abstract class RDD[T: ClassTag](
 
   /**
* Return a sampled subset of this RDD.
-   * 
+   *
* @param withReplacement can elements be sampled multiple times (replaced 
when sampled out)
* @param fraction expected size of the sample as a fraction of this RDD's 
size
*  without replacement: probability that each element is chosen; fraction 
must be [0, 1]
*  with replacement: expected number of times each element is chosen; 
fraction must be = 0
* @param seed seed for the random number generator
*/
-  def sample(withReplacement: Boolean,
+  def sample(
+  withReplacement: Boolean,
   fraction: Double,
-  seed: Long = Utils.random.nextLong): RDD[T] = {
+  seed: Long = Utils.random.nextLong): RDD[T] = withScope {
 require(fraction = 0.0, Negative fraction value:  + fraction)
 if (withReplacement) {
   new PartitionwiseSampledRDD[T, T](this, new 

[4/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI

2015-05-04 Thread andrewor14
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/resources/org/apache/spark/ui/static/graphlib-dot.min.js
--
diff --git 
a/core/src/main/resources/org/apache/spark/ui/static/graphlib-dot.min.js 
b/core/src/main/resources/org/apache/spark/ui/static/graphlib-dot.min.js
new file mode 100644
index 000..037316f
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/ui/static/graphlib-dot.min.js
@@ -0,0 +1,4 @@
+/*v0.5.2*/(function e(t,n,r){function s(o,u){if(!n[o]){if(!t[o]){var a=typeof 
require==functionrequire;if(!ua)return a(o,!0);if(i)return i(o,!0);var 
f=new Error(Cannot find module '+o+');throw f.code=MODULE_NOT_FOUND,f}var 
l=n[o]={exports:{}};t[o][0].call(l.exports,function(e){var n=t[o][1][e];return 
s(n?n:e)},l,l.exports,e,t,n,r)}return n[o].exports}var i=typeof 
require==functionrequire;for(var o=0;or.length;o++)s(r[o]);return 
s})({1:[function(require,module,exports){(function(global){graphlibDot=require(./index);graphlibDot.graphlib=require(graphlib);global.graphlibDot=graphlibDot}).call(this,typeof
 global!==undefined?global:typeof self!==undefined?self:typeof 
window!==undefined?window:{})},{./index:2,graphlib:9}],2:[function(require,module,exports){var
 
read=require(./lib/read-one),readMany=require(./lib/read-many),write=require(./lib/write-one),version=require(./lib/version);module.exports={read:read,readMany:readMany,write:write,version:versi
 
on,type:dot,buffer:false}},{./lib/read-many:5,./lib/read-one:6,./lib/version:7,./lib/write-one:8}],3:[function(require,module,exports){use
 strict;var 
_=require(lodash),Graph=require(graphlib).Graph;module.exports=buildGraph;function
 buildGraph(parseTree){var 
isDirected=parseTree.type!==graph,isMultigraph=!parseTree.strict,defaultStack=[{node:{},edge:{}}],g=new
 
Graph({directed:isDirected,multigraph:isMultigraph,compound:true});g.setGraph({});_.each(parseTree.stmts,function(stmt){handleStmt(g,stmt,defaultStack)});return
 g}function 
handleStmt(g,stmt,defaultStack,sg){switch(stmt.type){casenode:handleNodeStmt(g,stmt,defaultStack,sg);break;caseedge:handleEdgeStmt(g,stmt,defaultStack,sg);break;casesubgraph:handleSubgraphStmt(g,stmt,defaultStack,sg);break;caseattr:handleAttrStmt(g,stmt,defaultStack);break;caseinlineAttr:handleInlineAttrsStmt(g,stmt,defaultStack,sg);break}}function
 handleNodeStmt(g,stmt,defaultStack,sg){var 
v=stmt.id,attrs=stmt.attrs;maybeCreateNo
 de(g,v,defaultStack,sg);_.merge(g.node(v),attrs)}function 
handleEdgeStmt(g,stmt,defaultStack,sg){var 
attrs=stmt.attrs,prev,curr;_.each(stmt.elems,function(elem){handleStmt(g,elem,defaultStack,sg);switch(elem.type){casenode:curr=[elem.id];break;casesubgraph:curr=collectNodeIds(elem);break}_.each(prev,function(v){_.each(curr,function(w){var
 
name;if(g.hasEdge(v,w)g.isMultigraph()){name=_.uniqueId(edge)}if(!g.hasEdge(v,w,name)){g.setEdge(v,w,_.clone(_.last(defaultStack).edge),name)}_.merge(g.edge(v,w,name),attrs)})});prev=curr})}function
 handleSubgraphStmt(g,stmt,defaultStack,sg){var 
id=stmt.id;if(id===undefined){id=generateSubgraphId(g)}defaultStack.push(_.clone(_.last(defaultStack)));maybeCreateNode(g,id,defaultStack,sg);_.each(stmt.stmts,function(s){handleStmt(g,s,defaultStack,id)});if(!g.children(id).length){g.removeNode(id)}defaultStack.pop()}function
 
handleAttrStmt(g,stmt,defaultStack){_.merge(_.last(defaultStack)[stmt.attrType],stmt.attrs)}function
 handleInlineAttrsStmt(g
 ,stmt,defaultStack,sg){_.merge(sg?g.node(sg):g.graph(),stmt.attrs)}function 
generateSubgraphId(g){var id;do{id=_.uniqueId(sg)}while(g.hasNode(id));return 
id}function 
maybeCreateNode(g,v,defaultStack,sg){if(!g.hasNode(v)){g.setNode(v,_.clone(_.last(defaultStack).node));g.setParent(v,sg)}}function
 collectNodeIds(stmt){var ids={},stack=[],curr;var 
push=stack.push.bind(stack);push(stmt);while(stack.length){curr=stack.pop();switch(curr.type){casenode:ids[curr.id]=true;break;caseedge:_.each(curr.elems,push);break;casesubgraph:_.each(curr.stmts,push);break}}return
 
_.keys(ids)}},{graphlib:9,lodash:28}],4:[function(require,module,exports){module.exports=function(){function
 peg$subclass(child,parent){function 
ctor(){this.constructor=child}ctor.prototype=parent.prototype;child.prototype=new
 ctor}function 
SyntaxError(message,expected,found,offset,line,column){this.message=message;this.expected=expected;this.found=found;this.offset=offset;this.line=line;this.column=column;this.name=Synt
 axError}peg$subclass(SyntaxError,Error);function parse(input){var 

spark git commit: [SPARK-7243][SQL] Contingency Tables for DataFrames

2015-05-04 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 863ec0cb4 - ecf0d8a9f


[SPARK-7243][SQL] Contingency Tables for DataFrames

Computes a pair-wise frequency table of the given columns. Also known as 
cross-tabulation.
cc mengxr rxin

Author: Burak Yavuz brk...@gmail.com

Closes #5842 from brkyvz/df-cont and squashes the following commits:

a07c01e [Burak Yavuz] addressed comments v4.1
ae9e01d [Burak Yavuz] fix test
9106585 [Burak Yavuz] addressed comments v4.0
bced829 [Burak Yavuz] fix merge conflicts
a63ad00 [Burak Yavuz] addressed comments v3.0
a0cad97 [Burak Yavuz] addressed comments v3.0
6805df8 [Burak Yavuz] addressed comments and fixed test
939b7c4 [Burak Yavuz] lint python
7f098bc [Burak Yavuz] add crosstab pyTest
fd53b00 [Burak Yavuz] added python support for crosstab
27a5a81 [Burak Yavuz] implemented crosstab

(cherry picked from commit 80554111703c08e2bedbe303e04ecd162ec119e1)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ecf0d8a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ecf0d8a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ecf0d8a9

Branch: refs/heads/branch-1.4
Commit: ecf0d8a9f1eaf157433483090571ceaaee0b3f2e
Parents: 863ec0c
Author: Burak Yavuz brk...@gmail.com
Authored: Mon May 4 17:02:49 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Mon May 4 17:03:03 2015 -0700

--
 python/pyspark/sql/dataframe.py | 25 +
 python/pyspark/sql/tests.py |  9 
 .../spark/sql/DataFrameStatFunctions.scala  | 37 +
 .../sql/execution/stat/StatFunctions.scala  | 37 +++--
 .../apache/spark/sql/JavaDataFrameSuite.java| 28 ++
 .../apache/spark/sql/DataFrameStatSuite.scala   | 55 +---
 6 files changed, 160 insertions(+), 31 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ecf0d8a9/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 22762c5..f30a92d 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -931,6 +931,26 @@ class DataFrame(object):
 raise ValueError(col2 should be a string.)
 return self._jdf.stat().cov(col1, col2)
 
+def crosstab(self, col1, col2):
+
+Computes a pair-wise frequency table of the given columns. Also known 
as a contingency
+table. The number of distinct values for each column should be less 
than 1e4. The first
+column of each row will be the distinct values of `col1` and the 
column names will be the
+distinct values of `col2`. The name of the first column will be 
`$col1_$col2`. Pairs that
+have no occurrences will have `null` as their counts.
+:func:`DataFrame.crosstab` and :func:`DataFrameStatFunctions.crosstab` 
are aliases.
+
+:param col1: The name of the first column. Distinct items will make 
the first item of
+each row.
+:param col2: The name of the second column. Distinct items will make 
the column names
+of the DataFrame.
+
+if not isinstance(col1, str):
+raise ValueError(col1 should be a string.)
+if not isinstance(col2, str):
+raise ValueError(col2 should be a string.)
+return DataFrame(self._jdf.stat().crosstab(col1, col2), self.sql_ctx)
+
 def freqItems(self, cols, support=None):
 
 Finding frequent items for columns, possibly with false positives. 
Using the
@@ -1423,6 +1443,11 @@ class DataFrameStatFunctions(object):
 
 cov.__doc__ = DataFrame.cov.__doc__
 
+def crosstab(self, col1, col2):
+return self.df.crosstab(col1, col2)
+
+crosstab.__doc__ = DataFrame.crosstab.__doc__
+
 def freqItems(self, cols, support=None):
 return self.df.freqItems(cols, support)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf0d8a9/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index d652c30..7ea6656 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -405,6 +405,15 @@ class SQLTests(ReusedPySparkTestCase):
 cov = df.stat.cov(a, b)
 self.assertTrue(abs(cov - 55.0 / 3)  1e-6)
 
+def test_crosstab(self):
+df = self.sc.parallelize([Row(a=i % 3, b=i % 2) for i in range(1, 
7)]).toDF()
+ct = df.stat.crosstab(a, b).collect()
+ct = sorted(ct, key=lambda x: x[0])
+for i, row in enumerate(ct):
+self.assertEqual(row[0], str(i))
+self.assertTrue(row[1], 1)
+ 

spark git commit: [SPARK-7243][SQL] Contingency Tables for DataFrames

2015-05-04 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master fc8b58195 - 805541117


[SPARK-7243][SQL] Contingency Tables for DataFrames

Computes a pair-wise frequency table of the given columns. Also known as 
cross-tabulation.
cc mengxr rxin

Author: Burak Yavuz brk...@gmail.com

Closes #5842 from brkyvz/df-cont and squashes the following commits:

a07c01e [Burak Yavuz] addressed comments v4.1
ae9e01d [Burak Yavuz] fix test
9106585 [Burak Yavuz] addressed comments v4.0
bced829 [Burak Yavuz] fix merge conflicts
a63ad00 [Burak Yavuz] addressed comments v3.0
a0cad97 [Burak Yavuz] addressed comments v3.0
6805df8 [Burak Yavuz] addressed comments and fixed test
939b7c4 [Burak Yavuz] lint python
7f098bc [Burak Yavuz] add crosstab pyTest
fd53b00 [Burak Yavuz] added python support for crosstab
27a5a81 [Burak Yavuz] implemented crosstab


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80554111
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80554111
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80554111

Branch: refs/heads/master
Commit: 80554111703c08e2bedbe303e04ecd162ec119e1
Parents: fc8b581
Author: Burak Yavuz brk...@gmail.com
Authored: Mon May 4 17:02:49 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Mon May 4 17:02:49 2015 -0700

--
 python/pyspark/sql/dataframe.py | 25 +
 python/pyspark/sql/tests.py |  9 
 .../spark/sql/DataFrameStatFunctions.scala  | 37 +
 .../sql/execution/stat/StatFunctions.scala  | 37 +++--
 .../apache/spark/sql/JavaDataFrameSuite.java| 28 ++
 .../apache/spark/sql/DataFrameStatSuite.scala   | 55 +---
 6 files changed, 160 insertions(+), 31 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/80554111/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 22762c5..f30a92d 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -931,6 +931,26 @@ class DataFrame(object):
 raise ValueError(col2 should be a string.)
 return self._jdf.stat().cov(col1, col2)
 
+def crosstab(self, col1, col2):
+
+Computes a pair-wise frequency table of the given columns. Also known 
as a contingency
+table. The number of distinct values for each column should be less 
than 1e4. The first
+column of each row will be the distinct values of `col1` and the 
column names will be the
+distinct values of `col2`. The name of the first column will be 
`$col1_$col2`. Pairs that
+have no occurrences will have `null` as their counts.
+:func:`DataFrame.crosstab` and :func:`DataFrameStatFunctions.crosstab` 
are aliases.
+
+:param col1: The name of the first column. Distinct items will make 
the first item of
+each row.
+:param col2: The name of the second column. Distinct items will make 
the column names
+of the DataFrame.
+
+if not isinstance(col1, str):
+raise ValueError(col1 should be a string.)
+if not isinstance(col2, str):
+raise ValueError(col2 should be a string.)
+return DataFrame(self._jdf.stat().crosstab(col1, col2), self.sql_ctx)
+
 def freqItems(self, cols, support=None):
 
 Finding frequent items for columns, possibly with false positives. 
Using the
@@ -1423,6 +1443,11 @@ class DataFrameStatFunctions(object):
 
 cov.__doc__ = DataFrame.cov.__doc__
 
+def crosstab(self, col1, col2):
+return self.df.crosstab(col1, col2)
+
+crosstab.__doc__ = DataFrame.crosstab.__doc__
+
 def freqItems(self, cols, support=None):
 return self.df.freqItems(cols, support)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/80554111/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index d652c30..7ea6656 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -405,6 +405,15 @@ class SQLTests(ReusedPySparkTestCase):
 cov = df.stat.cov(a, b)
 self.assertTrue(abs(cov - 55.0 / 3)  1e-6)
 
+def test_crosstab(self):
+df = self.sc.parallelize([Row(a=i % 3, b=i % 2) for i in range(1, 
7)]).toDF()
+ct = df.stat.crosstab(a, b).collect()
+ct = sorted(ct, key=lambda x: x[0])
+for i, row in enumerate(ct):
+self.assertEqual(row[0], str(i))
+self.assertTrue(row[1], 1)
+self.assertTrue(row[2], 1)
+
 def test_math_functions(self):
 df = self.sc.parallelize([Row(a=i, b=2 * i) 

spark git commit: [SPARK-7319][SQL] Improve the output from DataFrame.show()

2015-05-04 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 893b3103f - 34edaa8ac


[SPARK-7319][SQL] Improve the output from DataFrame.show()

Author: 云峤 chensong...@alibaba-inc.com

Closes #5865 from kaka1992/df.show and squashes the following commits:

c79204b [云峤] Update
a1338f6 [云峤] Update python dataFrame show test and add empty df unit test.
734369c [云峤] Update python dataFrame show test and add empty df unit test.
84aec3e [云峤] Update python dataFrame show test and add empty df unit test.
159b3d5 [云峤] update
03ef434 [云峤] update
7394fd5 [云峤] update test show
ced487a [云峤] update pep8
b6e690b [云峤] Merge remote-tracking branch 'upstream/master' into df.show
30ac311 [云峤] [SPARK-7294] ADD BETWEEN
7d62368 [云峤] [SPARK-7294] ADD BETWEEN
baf839b [云峤] [SPARK-7294] ADD BETWEEN
d11d5b9 [云峤] [SPARK-7294] ADD BETWEEN

(cherry picked from commit f32e69ecc333867fc966f65cd0aeaeddd43e0945)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34edaa8a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34edaa8a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34edaa8a

Branch: refs/heads/branch-1.4
Commit: 34edaa8ac2334258961b290adb29d540233ee2bf
Parents: 893b310
Author: 云峤 chensong...@alibaba-inc.com
Authored: Mon May 4 12:08:38 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Mon May 4 13:24:52 2015 -0700

--
 R/pkg/R/DataFrame.R |   2 +-
 R/pkg/inst/tests/test_sparkSQL.R|   2 +-
 python/pyspark/sql/dataframe.py | 105 ---
 .../scala/org/apache/spark/sql/DataFrame.scala  |  28 +++--
 .../org/apache/spark/sql/DataFrameSuite.scala   |  19 
 5 files changed, 112 insertions(+), 44 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/34edaa8a/R/pkg/R/DataFrame.R
--
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index b59b700..841e77e 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -167,7 +167,7 @@ setMethod(isLocal,
 setMethod(showDF,
   signature(x = DataFrame),
   function(x, numRows = 20) {
-cat(callJMethod(x@sdf, showString, numToInt(numRows)), \n)
+callJMethod(x@sdf, showString, numToInt(numRows))
   })
 
 #' show

http://git-wip-us.apache.org/repos/asf/spark/blob/34edaa8a/R/pkg/inst/tests/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index af7a6c5..f82e56f 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -641,7 +641,7 @@ test_that(toJSON() returns an RDD of the correct values, {
 
 test_that(showDF(), {
   df - jsonFile(sqlCtx, jsonPath)
-  expect_output(showDF(df), age  name   \nnull Michael\n30   Andy   \n19   
Justin )
+  expect_output(showDF(df), ++---+\n| age|   
name|\n++---+\n|null|Michael|\n|  30|   Andy|\n|  19| 
Justin|\n++---+\n)
 })
 
 test_that(isLocal(), {

http://git-wip-us.apache.org/repos/asf/spark/blob/34edaa8a/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index aac5b8c..22762c5 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -275,9 +275,12 @@ class DataFrame(object):
  df
 DataFrame[age: int, name: string]
  df.show()
-age name
-2   Alice
-5   Bob
++---+-+
+|age| name|
++---+-+
+|  2|Alice|
+|  5|  Bob|
++---+-+
 
 print(self._jdf.showString(n))
 
@@ -591,12 +594,15 @@ class DataFrame(object):
 given, this function computes statistics for all numerical columns.
 
  df.describe().show()
-summary age
-count   2
-mean3.5
-stddev  1.5
-min 2
-max 5
++---+---+
+|summary|age|
++---+---+
+|  count|  2|
+|   mean|3.5|
+| stddev|1.5|
+|min|  2|
+|max|  5|
++---+---+
 
 jdf = self._jdf.describe(self._jseq(cols))
 return DataFrame(jdf, self.sql_ctx)
@@ -801,12 +807,18 @@ class DataFrame(object):
 :param subset: optional list of column names to consider.
 
  df4.dropna().show()
-age height name
-10  80 Alice
++---+--+-+
+|age|height| name|
++---+--+-+
+| 10|80|Alice|
++---+--+-+