svn commit: r1713691 - in /spark: _layouts/ site/ site/docs/ site/graphx/ site/mllib/ site/news/ site/releases/ site/screencasts/ site/sql/ site/streaming/
Author: rxin Date: Tue Nov 10 16:46:33 2015 New Revision: 1713691 URL: http://svn.apache.org/viewvc?rev=1713691=rev Log: update latest doc Modified: spark/_layouts/global.html spark/site/community.html spark/site/docs/latest spark/site/documentation.html spark/site/downloads.html spark/site/examples.html spark/site/faq.html spark/site/graphx/index.html spark/site/index.html spark/site/mailing-lists.html spark/site/mllib/index.html spark/site/news/amp-camp-2013-registration-ope.html spark/site/news/announcing-the-first-spark-summit.html spark/site/news/fourth-spark-screencast-published.html spark/site/news/index.html spark/site/news/nsdi-paper.html spark/site/news/one-month-to-spark-summit-2015.html spark/site/news/proposals-open-for-spark-summit-east.html spark/site/news/registration-open-for-spark-summit-east.html spark/site/news/run-spark-and-shark-on-amazon-emr.html spark/site/news/spark-0-6-1-and-0-5-2-released.html spark/site/news/spark-0-6-2-released.html spark/site/news/spark-0-7-0-released.html spark/site/news/spark-0-7-2-released.html spark/site/news/spark-0-7-3-released.html spark/site/news/spark-0-8-0-released.html spark/site/news/spark-0-8-1-released.html spark/site/news/spark-0-9-0-released.html spark/site/news/spark-0-9-1-released.html spark/site/news/spark-0-9-2-released.html spark/site/news/spark-1-0-0-released.html spark/site/news/spark-1-0-1-released.html spark/site/news/spark-1-0-2-released.html spark/site/news/spark-1-1-0-released.html spark/site/news/spark-1-1-1-released.html spark/site/news/spark-1-2-0-released.html spark/site/news/spark-1-2-1-released.html spark/site/news/spark-1-2-2-released.html spark/site/news/spark-1-3-0-released.html spark/site/news/spark-1-4-0-released.html spark/site/news/spark-1-4-1-released.html spark/site/news/spark-1-5-0-released.html spark/site/news/spark-1-5-1-released.html spark/site/news/spark-1-5-2-released.html spark/site/news/spark-accepted-into-apache-incubator.html spark/site/news/spark-and-shark-in-the-news.html spark/site/news/spark-becomes-tlp.html spark/site/news/spark-featured-in-wired.html spark/site/news/spark-mailing-lists-moving-to-apache.html spark/site/news/spark-meetups.html spark/site/news/spark-screencasts-published.html spark/site/news/spark-summit-2013-is-a-wrap.html spark/site/news/spark-summit-2014-videos-posted.html spark/site/news/spark-summit-2015-videos-posted.html spark/site/news/spark-summit-agenda-posted.html spark/site/news/spark-summit-east-2015-videos-posted.html spark/site/news/spark-summit-east-agenda-posted.html spark/site/news/spark-summit-europe-agenda-posted.html spark/site/news/spark-summit-europe.html spark/site/news/spark-tips-from-quantifind.html spark/site/news/spark-user-survey-and-powered-by-page.html spark/site/news/spark-version-0-6-0-released.html spark/site/news/spark-wins-daytona-gray-sort-100tb-benchmark.html spark/site/news/strata-exercises-now-available-online.html spark/site/news/submit-talks-to-spark-summit-2014.html spark/site/news/submit-talks-to-spark-summit-east-2016.html spark/site/news/two-weeks-to-spark-summit-2014.html spark/site/news/video-from-first-spark-development-meetup.html spark/site/releases/spark-release-0-3.html spark/site/releases/spark-release-0-5-0.html spark/site/releases/spark-release-0-5-1.html spark/site/releases/spark-release-0-5-2.html spark/site/releases/spark-release-0-6-0.html spark/site/releases/spark-release-0-6-1.html spark/site/releases/spark-release-0-6-2.html spark/site/releases/spark-release-0-7-0.html spark/site/releases/spark-release-0-7-2.html spark/site/releases/spark-release-0-7-3.html spark/site/releases/spark-release-0-8-0.html spark/site/releases/spark-release-0-8-1.html spark/site/releases/spark-release-0-9-0.html spark/site/releases/spark-release-0-9-1.html spark/site/releases/spark-release-0-9-2.html spark/site/releases/spark-release-1-0-0.html spark/site/releases/spark-release-1-0-1.html spark/site/releases/spark-release-1-0-2.html spark/site/releases/spark-release-1-1-0.html spark/site/releases/spark-release-1-1-1.html spark/site/releases/spark-release-1-2-0.html spark/site/releases/spark-release-1-2-1.html spark/site/releases/spark-release-1-2-2.html spark/site/releases/spark-release-1-3-0.html spark/site/releases/spark-release-1-3-1.html spark/site/releases/spark-release-1-4-0.html spark/site/releases/spark-release-1-4-1.html spark/site/releases/spark-release-1-5-0.html spark/site/releases/spark-release-1-5-1.html spark/site/releases/spark-release-1-5-2.html spark/site/research.html spark/site/screencasts/1-first-steps-with-spark.html
svn commit: r1713690 [2/3] - in /spark: ./ js/ news/_posts/ releases/_posts/ site/ site/graphx/ site/js/ site/mllib/ site/news/ site/releases/ site/screencasts/ site/sql/ site/streaming/
Added: spark/site/news/spark-1-5-2-released.html URL: http://svn.apache.org/viewvc/spark/site/news/spark-1-5-2-released.html?rev=1713690=auto == --- spark/site/news/spark-1-5-2-released.html (added) +++ spark/site/news/spark-1-5-2-released.html Tue Nov 10 16:44:07 2015 @@ -0,0 +1,197 @@ + + + + + + + + + Spark 1.5.2 released | Apache Spark + + + + + + + + + + + + + + var _gaq = _gaq || []; + _gaq.push(['_setAccount', 'UA-32518208-2']); + _gaq.push(['_trackPageview']); + (function() { +var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true; +ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js'; +var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s); + })(); + + + function trackOutboundLink(link, category, action) { +try { + _gaq.push(['_trackEvent', category , action]); +} catch(err){} + +setTimeout(function() { + document.location.href = link.href; +}, 100); + } + + + + + + + + +https://code.jquery.com/jquery.js"> + + + + + + + + + + + + Lightning-fast cluster computing + + + + + + + + + + Toggle navigation + + + + + + + + + + Download + + + Libraries + + + SQL and DataFrames + Spark Streaming + MLlib (machine learning) + GraphX (graph) + + http://spark-packages.org;>Third-Party Packages + + + + + Documentation + + + Latest Release (Spark 1.5.1) + Other Resources + + + Examples + + + Community + + + Mailing Lists + Events and Meetups + Project History + https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark;>Powered By + https://cwiki.apache.org/confluence/display/SPARK/Committers;>Project Committers + https://issues.apache.org/jira/browse/SPARK;>Issue Tracker + + + FAQ + + + + + + + + + + Latest News + + + Spark 1.5.2 released + (Nov 09, 2015) + + Submission is open for Spark Summit East 2016 + (Oct 14, 2015) + + Spark 1.5.1 released + (Oct 02, 2015) + + Spark 1.5.0 released + (Sep 09, 2015) + + + Archive + + + +Download Spark + + +Built-in Libraries: + + +SQL and DataFrames +Spark Streaming +MLlib (machine learning) +GraphX (graph) + + http://spark-packages.org;>Third-Party Packages + + + + +Spark 1.5.2 released + + +We are happy to announce the availability of Spark 1.5.2! This maintenance release includes fixes across several areas of Spark, including the DataFrame API, Spark Streaming, PySpark, R, Spark SQL, and MLlib. + +Visit the release notes to read about the new features, or download the release today. + + + + +Spark News Archive + + + + + + + + + + Apache Spark, Spark, Apache, and the Spark logo are trademarks of + http://www.apache.org;>The Apache Software Foundation. + + + + + + Modified: spark/site/news/spark-accepted-into-apache-incubator.html URL: http://svn.apache.org/viewvc/spark/site/news/spark-accepted-into-apache-incubator.html?rev=1713690=1713689=1713690=diff == --- spark/site/news/spark-accepted-into-apache-incubator.html (original) +++ spark/site/news/spark-accepted-into-apache-incubator.html Tue Nov 10 16:44:07 2015 @@ -134,6 +134,9 @@ Latest News + Spark 1.5.2 released + (Nov 09, 2015) + Submission is open for Spark Summit East 2016 (Oct 14, 2015) @@ -143,9 +146,6 @@ Spark 1.5.0 released (Sep 09, 2015) - Spark Summit Europe agenda posted - (Sep 07, 2015) - Archive Modified: spark/site/news/spark-and-shark-in-the-news.html URL: http://svn.apache.org/viewvc/spark/site/news/spark-and-shark-in-the-news.html?rev=1713690=1713689=1713690=diff == --- spark/site/news/spark-and-shark-in-the-news.html (original) +++ spark/site/news/spark-and-shark-in-the-news.html Tue Nov 10 16:44:07 2015 @@ -134,6 +134,9 @@ Latest News
svn commit: r1713690 [3/3] - in /spark: ./ js/ news/_posts/ releases/_posts/ site/ site/graphx/ site/js/ site/mllib/ site/news/ site/releases/ site/screencasts/ site/sql/ site/streaming/
Modified: spark/site/releases/spark-release-1-2-1.html URL: http://svn.apache.org/viewvc/spark/site/releases/spark-release-1-2-1.html?rev=1713690=1713689=1713690=diff == --- spark/site/releases/spark-release-1-2-1.html (original) +++ spark/site/releases/spark-release-1-2-1.html Tue Nov 10 16:44:07 2015 @@ -134,6 +134,9 @@ Latest News + Spark 1.5.2 released + (Nov 09, 2015) + Submission is open for Spark Summit East 2016 (Oct 14, 2015) @@ -143,9 +146,6 @@ Spark 1.5.0 released (Sep 09, 2015) - Spark Summit Europe agenda posted - (Sep 07, 2015) - Archive Modified: spark/site/releases/spark-release-1-2-2.html URL: http://svn.apache.org/viewvc/spark/site/releases/spark-release-1-2-2.html?rev=1713690=1713689=1713690=diff == --- spark/site/releases/spark-release-1-2-2.html (original) +++ spark/site/releases/spark-release-1-2-2.html Tue Nov 10 16:44:07 2015 @@ -134,6 +134,9 @@ Latest News + Spark 1.5.2 released + (Nov 09, 2015) + Submission is open for Spark Summit East 2016 (Oct 14, 2015) @@ -143,9 +146,6 @@ Spark 1.5.0 released (Sep 09, 2015) - Spark Summit Europe agenda posted - (Sep 07, 2015) - Archive Modified: spark/site/releases/spark-release-1-3-0.html URL: http://svn.apache.org/viewvc/spark/site/releases/spark-release-1-3-0.html?rev=1713690=1713689=1713690=diff == --- spark/site/releases/spark-release-1-3-0.html (original) +++ spark/site/releases/spark-release-1-3-0.html Tue Nov 10 16:44:07 2015 @@ -134,6 +134,9 @@ Latest News + Spark 1.5.2 released + (Nov 09, 2015) + Submission is open for Spark Summit East 2016 (Oct 14, 2015) @@ -143,9 +146,6 @@ Spark 1.5.0 released (Sep 09, 2015) - Spark Summit Europe agenda posted - (Sep 07, 2015) - Archive @@ -175,7 +175,7 @@ To download Spark 1.3 visit the downloads page. Spark Core -Spark 1.3 sees a handful of usability improvements in the core engine. The core API now supports https://issues.apache.org/jira/browse/SPARK-5430;>multi level aggregation trees to help speed up expensive reduce operations. https://issues.apache.org/jira/browse/SPARK-5063;>Improved error reporting has been added for certain gotcha operations. Sparks Jetty dependency is https://issues.apache.org/jira/browse/SPARK-3996;>now shaded to help avoid conflicts with user programs. Spark now supports https://issues.apache.org/jira/browse/SPARK-3883;>SSL encryption for some communication endpoints. Finaly, realtime https://issues.apache.org/jira/browse/SPARK-3428;>GC metrics and https://issues.apache.org/jira/browse/SPARK-4874;>record counts have been added to the UI. +Spark 1.3 sees a handful of usability improvements in the core engine. The core API now supports https://issues.apache.org/jira/browse/SPARK-5430;>multi level aggregation trees to help speed up expensive reduce operations. https://issues.apache.org/jira/browse/SPARK-5063;>Improved error reporting has been added for certain gotcha operations. Sparks Jetty dependency is https://issues.apache.org/jira/browse/SPARK-3996;>now shaded to help avoid conflicts with user programs. Spark now supports https://issues.apache.org/jira/browse/SPARK-3883;>SSL encryption for some communication endpoints. Finaly, realtime https://issues.apache.org/jira/browse/SPARK-3428;>GC metrics and https://issues.apache.org/jira/browse/SPARK-4874;>record counts have been added to the UI. DataFrame API Spark 1.3 adds a new DataFrames API that provides powerful and convenient operators when working with structured datasets. The DataFrame is an evolution of the base RDD API that includes named fields along with schema information. Itâs easy to construct a DataFrame from sources such as Hive tables, JSON data, a JDBC database, or any implementation of Sparkâs new data source API. Data frames will become a common interchange format between Spark components and when importing and exporting data to other systems. Data frames are supported in Python, Scala, and Java. @@ -187,7 +187,7 @@ In this release Spark MLlib introduces several new algorithms: latent Dirichlet allocation (LDA) for https://issues.apache.org/jira/browse/SPARK-1405;>topic modeling, https://issues.apache.org/jira/browse/SPARK-2309;>multinomial logistic regression for multiclass classification,
svn commit: r1713690 [1/3] - in /spark: ./ js/ news/_posts/ releases/_posts/ site/ site/graphx/ site/js/ site/mllib/ site/news/ site/releases/ site/screencasts/ site/sql/ site/streaming/
Author: rxin Date: Tue Nov 10 16:44:07 2015 New Revision: 1713690 URL: http://svn.apache.org/viewvc?rev=1713690=rev Log: 1.5.2 release Added: spark/news/_posts/2015-11-09-spark-1-5-2-released.md spark/releases/_posts/2015-11-09-spark-release-1-5-2.md spark/site/news/spark-1-5-2-released.html spark/site/releases/spark-release-1-5-2.html Modified: spark/documentation.md spark/downloads.md spark/js/downloads.js spark/site/community.html spark/site/documentation.html spark/site/downloads.html spark/site/examples.html spark/site/faq.html spark/site/graphx/index.html spark/site/index.html spark/site/js/downloads.js spark/site/mailing-lists.html spark/site/mllib/index.html spark/site/news/amp-camp-2013-registration-ope.html spark/site/news/announcing-the-first-spark-summit.html spark/site/news/fourth-spark-screencast-published.html spark/site/news/index.html spark/site/news/nsdi-paper.html spark/site/news/one-month-to-spark-summit-2015.html spark/site/news/proposals-open-for-spark-summit-east.html spark/site/news/registration-open-for-spark-summit-east.html spark/site/news/run-spark-and-shark-on-amazon-emr.html spark/site/news/spark-0-6-1-and-0-5-2-released.html spark/site/news/spark-0-6-2-released.html spark/site/news/spark-0-7-0-released.html spark/site/news/spark-0-7-2-released.html spark/site/news/spark-0-7-3-released.html spark/site/news/spark-0-8-0-released.html spark/site/news/spark-0-8-1-released.html spark/site/news/spark-0-9-0-released.html spark/site/news/spark-0-9-1-released.html spark/site/news/spark-0-9-2-released.html spark/site/news/spark-1-0-0-released.html spark/site/news/spark-1-0-1-released.html spark/site/news/spark-1-0-2-released.html spark/site/news/spark-1-1-0-released.html spark/site/news/spark-1-1-1-released.html spark/site/news/spark-1-2-0-released.html spark/site/news/spark-1-2-1-released.html spark/site/news/spark-1-2-2-released.html spark/site/news/spark-1-3-0-released.html spark/site/news/spark-1-4-0-released.html spark/site/news/spark-1-4-1-released.html spark/site/news/spark-1-5-0-released.html spark/site/news/spark-1-5-1-released.html spark/site/news/spark-accepted-into-apache-incubator.html spark/site/news/spark-and-shark-in-the-news.html spark/site/news/spark-becomes-tlp.html spark/site/news/spark-featured-in-wired.html spark/site/news/spark-mailing-lists-moving-to-apache.html spark/site/news/spark-meetups.html spark/site/news/spark-screencasts-published.html spark/site/news/spark-summit-2013-is-a-wrap.html spark/site/news/spark-summit-2014-videos-posted.html spark/site/news/spark-summit-2015-videos-posted.html spark/site/news/spark-summit-agenda-posted.html spark/site/news/spark-summit-east-2015-videos-posted.html spark/site/news/spark-summit-east-agenda-posted.html spark/site/news/spark-summit-europe-agenda-posted.html spark/site/news/spark-summit-europe.html spark/site/news/spark-tips-from-quantifind.html spark/site/news/spark-user-survey-and-powered-by-page.html spark/site/news/spark-version-0-6-0-released.html spark/site/news/spark-wins-daytona-gray-sort-100tb-benchmark.html spark/site/news/strata-exercises-now-available-online.html spark/site/news/submit-talks-to-spark-summit-2014.html spark/site/news/submit-talks-to-spark-summit-east-2016.html spark/site/news/two-weeks-to-spark-summit-2014.html spark/site/news/video-from-first-spark-development-meetup.html spark/site/releases/spark-release-0-3.html spark/site/releases/spark-release-0-5-0.html spark/site/releases/spark-release-0-5-1.html spark/site/releases/spark-release-0-5-2.html spark/site/releases/spark-release-0-6-0.html spark/site/releases/spark-release-0-6-1.html spark/site/releases/spark-release-0-6-2.html spark/site/releases/spark-release-0-7-0.html spark/site/releases/spark-release-0-7-2.html spark/site/releases/spark-release-0-7-3.html spark/site/releases/spark-release-0-8-0.html spark/site/releases/spark-release-0-8-1.html spark/site/releases/spark-release-0-9-0.html spark/site/releases/spark-release-0-9-1.html spark/site/releases/spark-release-0-9-2.html spark/site/releases/spark-release-1-0-0.html spark/site/releases/spark-release-1-0-1.html spark/site/releases/spark-release-1-0-2.html spark/site/releases/spark-release-1-1-0.html spark/site/releases/spark-release-1-1-1.html spark/site/releases/spark-release-1-2-0.html spark/site/releases/spark-release-1-2-1.html spark/site/releases/spark-release-1-2-2.html spark/site/releases/spark-release-1-3-0.html spark/site/releases/spark-release-1-3-1.html spark/site/releases/spark-release-1-4-0.html spark/site/releases/spark-release-1-4-1.html
spark git commit: [SPARK-10863][SPARKR] Method coltypes() (New version)
Repository: spark Updated Branches: refs/heads/branch-1.6 7c4ade0d7 -> d2405cb5e [SPARK-10863][SPARKR] Method coltypes() (New version) This is a follow up on PR #8984, as the corresponding branch for such PR was damaged. Author: Oscar D. Lara YejasCloses #9579 from olarayej/SPARK-10863_NEW14. (cherry picked from commit 47735cdc2a878cfdbe76316d3ff8314a45dabf54) Signed-off-by: Shivaram Venkataraman Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2405cb5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2405cb5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2405cb5 Branch: refs/heads/branch-1.6 Commit: d2405cb5e1e219ca38ed5debf360191df84dd94b Parents: 7c4ade0 Author: Oscar D. Lara Yejas Authored: Tue Nov 10 11:07:57 2015 -0800 Committer: Shivaram Venkataraman Committed: Tue Nov 10 11:08:08 2015 -0800 -- R/pkg/DESCRIPTION| 1 + R/pkg/NAMESPACE | 6 ++--- R/pkg/R/DataFrame.R | 49 +++ R/pkg/R/generics.R | 4 +++ R/pkg/R/schema.R | 15 +-- R/pkg/R/types.R | 43 ++ R/pkg/inst/tests/test_sparkSQL.R | 24 - 7 files changed, 124 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d2405cb5/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 3d6edb7..369714f 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -34,4 +34,5 @@ Collate: 'serialize.R' 'sparkR.R' 'stats.R' +'types.R' 'utils.R' http://git-wip-us.apache.org/repos/asf/spark/blob/d2405cb5/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 56b8ed0..52fd6c9 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -23,9 +23,11 @@ export("setJobGroup", exportClasses("DataFrame") exportMethods("arrange", + "as.data.frame", "attach", "cache", "collect", + "coltypes", "columns", "count", "cov", @@ -262,6 +264,4 @@ export("structField", "structType", "structType.jobj", "structType.structField", - "print.structType") - -export("as.data.frame") + "print.structType") \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/d2405cb5/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index e9013aa..cc86806 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2152,3 +2152,52 @@ setMethod("with", newEnv <- assignNewEnv(data) eval(substitute(expr), envir = newEnv, enclos = newEnv) }) + +#' Returns the column types of a DataFrame. +#' +#' @name coltypes +#' @title Get column types of a DataFrame +#' @family dataframe_funcs +#' @param x (DataFrame) +#' @return value (character) A character vector with the column types of the given DataFrame +#' @rdname coltypes +#' @examples \dontrun{ +#' irisDF <- createDataFrame(sqlContext, iris) +#' coltypes(irisDF) +#' } +setMethod("coltypes", + signature(x = "DataFrame"), + function(x) { +# Get the data types of the DataFrame by invoking dtypes() function +types <- sapply(dtypes(x), function(x) {x[[2]]}) + +# Map Spark data types into R's data types using DATA_TYPES environment +rTypes <- sapply(types, USE.NAMES=F, FUN=function(x) { + + # Check for primitive types + type <- PRIMITIVE_TYPES[[x]] + + if (is.null(type)) { +# Check for complex types +for (t in names(COMPLEX_TYPES)) { + if (substring(x, 1, nchar(t)) == t) { +type <- COMPLEX_TYPES[[t]] +break + } +} + +if (is.null(type)) { + stop(paste("Unsupported data type: ", x)) +} + } + type +}) + +# Find which types don't have mapping to R +naIndices <- which(is.na(rTypes)) + +# Assign the original scala data types to the unmatched ones +rTypes[naIndices] <- types[naIndices] + +rTypes + }) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/d2405cb5/R/pkg/R/generics.R
spark git commit: [SPARK-10863][SPARKR] Method coltypes() (New version)
Repository: spark Updated Branches: refs/heads/master e0701c756 -> 47735cdc2 [SPARK-10863][SPARKR] Method coltypes() (New version) This is a follow up on PR #8984, as the corresponding branch for such PR was damaged. Author: Oscar D. Lara YejasCloses #9579 from olarayej/SPARK-10863_NEW14. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47735cdc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47735cdc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47735cdc Branch: refs/heads/master Commit: 47735cdc2a878cfdbe76316d3ff8314a45dabf54 Parents: e0701c7 Author: Oscar D. Lara Yejas Authored: Tue Nov 10 11:07:57 2015 -0800 Committer: Shivaram Venkataraman Committed: Tue Nov 10 11:07:57 2015 -0800 -- R/pkg/DESCRIPTION| 1 + R/pkg/NAMESPACE | 6 ++--- R/pkg/R/DataFrame.R | 49 +++ R/pkg/R/generics.R | 4 +++ R/pkg/R/schema.R | 15 +-- R/pkg/R/types.R | 43 ++ R/pkg/inst/tests/test_sparkSQL.R | 24 - 7 files changed, 124 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/47735cdc/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 3d6edb7..369714f 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -34,4 +34,5 @@ Collate: 'serialize.R' 'sparkR.R' 'stats.R' +'types.R' 'utils.R' http://git-wip-us.apache.org/repos/asf/spark/blob/47735cdc/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 56b8ed0..52fd6c9 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -23,9 +23,11 @@ export("setJobGroup", exportClasses("DataFrame") exportMethods("arrange", + "as.data.frame", "attach", "cache", "collect", + "coltypes", "columns", "count", "cov", @@ -262,6 +264,4 @@ export("structField", "structType", "structType.jobj", "structType.structField", - "print.structType") - -export("as.data.frame") + "print.structType") \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/47735cdc/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index e9013aa..cc86806 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2152,3 +2152,52 @@ setMethod("with", newEnv <- assignNewEnv(data) eval(substitute(expr), envir = newEnv, enclos = newEnv) }) + +#' Returns the column types of a DataFrame. +#' +#' @name coltypes +#' @title Get column types of a DataFrame +#' @family dataframe_funcs +#' @param x (DataFrame) +#' @return value (character) A character vector with the column types of the given DataFrame +#' @rdname coltypes +#' @examples \dontrun{ +#' irisDF <- createDataFrame(sqlContext, iris) +#' coltypes(irisDF) +#' } +setMethod("coltypes", + signature(x = "DataFrame"), + function(x) { +# Get the data types of the DataFrame by invoking dtypes() function +types <- sapply(dtypes(x), function(x) {x[[2]]}) + +# Map Spark data types into R's data types using DATA_TYPES environment +rTypes <- sapply(types, USE.NAMES=F, FUN=function(x) { + + # Check for primitive types + type <- PRIMITIVE_TYPES[[x]] + + if (is.null(type)) { +# Check for complex types +for (t in names(COMPLEX_TYPES)) { + if (substring(x, 1, nchar(t)) == t) { +type <- COMPLEX_TYPES[[t]] +break + } +} + +if (is.null(type)) { + stop(paste("Unsupported data type: ", x)) +} + } + type +}) + +# Find which types don't have mapping to R +naIndices <- which(is.na(rTypes)) + +# Assign the original scala data types to the unmatched ones +rTypes[naIndices] <- types[naIndices] + +rTypes + }) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/47735cdc/R/pkg/R/generics.R -- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index efef7d6..89731af 100644 --- a/R/pkg/R/generics.R +++
spark git commit: [SPARK-11578][SQL][FOLLOW-UP] complete the user facing api for typed aggregation
Repository: spark Updated Branches: refs/heads/master 47735cdc2 -> dfcfcbcc0 [SPARK-11578][SQL][FOLLOW-UP] complete the user facing api for typed aggregation Currently the user facing api for typed aggregation has some limitations: * the customized typed aggregation must be the first of aggregation list * the customized typed aggregation can only use long as buffer type * the customized typed aggregation can only use flat type as result type This PR tries to remove these limitations. Author: Wenchen FanCloses #9599 from cloud-fan/agg. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dfcfcbcc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dfcfcbcc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dfcfcbcc Branch: refs/heads/master Commit: dfcfcbcc0448ebc6f02eba6bf0495832a321c87e Parents: 47735cd Author: Wenchen Fan Authored: Tue Nov 10 11:14:25 2015 -0800 Committer: Michael Armbrust Committed: Tue Nov 10 11:14:25 2015 -0800 -- .../catalyst/encoders/ExpressionEncoder.scala | 6 +++ .../aggregate/TypedAggregateExpression.scala| 50 +-- .../spark/sql/expressions/Aggregator.scala | 5 ++ .../spark/sql/DatasetAggregatorSuite.scala | 52 4 files changed, 99 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dfcfcbcc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index c287aeb..005c062 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -185,6 +185,12 @@ case class ExpressionEncoder[T]( }) } + def shift(delta: Int): ExpressionEncoder[T] = { +copy(constructExpression = constructExpression transform { + case r: BoundReference => r.copy(ordinal = r.ordinal + delta) +}) + } + /** * Returns a copy of this encoder where the expressions used to create an object given an * input row have been modified to pull the object out from a nested struct, instead of the http://git-wip-us.apache.org/repos/asf/spark/blob/dfcfcbcc/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala index 24d8122..0e5bc1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala @@ -20,13 +20,13 @@ package org.apache.spark.sql.execution.aggregate import scala.language.existentials import org.apache.spark.Logging +import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.{encoderFor, Encoder} import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate -import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{StructType, DataType} +import org.apache.spark.sql.types._ object TypedAggregateExpression { def apply[A, B : Encoder, C : Encoder]( @@ -67,8 +67,11 @@ case class TypedAggregateExpression( override def nullable: Boolean = true - // TODO: this assumes flat results... - override def dataType: DataType = cEncoder.schema.head.dataType + override def dataType: DataType = if (cEncoder.flat) { +cEncoder.schema.head.dataType + } else { +cEncoder.schema + } override def deterministic: Boolean = true @@ -93,32 +96,51 @@ case class TypedAggregateExpression( case a: AttributeReference => inputMapping(a) }) - // TODO: this probably only works when we are in the first column. val bAttributes = bEncoder.schema.toAttributes lazy val boundB = bEncoder.resolve(bAttributes).bind(bAttributes) + private def updateBuffer(buffer: MutableRow, value: InternalRow): Unit = { +// todo: need a more neat way to assign the value. +var i = 0 +while (i < aggBufferAttributes.length) { +
spark git commit: [SPARK-11578][SQL][FOLLOW-UP] complete the user facing api for typed aggregation
Repository: spark Updated Branches: refs/heads/branch-1.6 d2405cb5e -> 6e2e84f3e [SPARK-11578][SQL][FOLLOW-UP] complete the user facing api for typed aggregation Currently the user facing api for typed aggregation has some limitations: * the customized typed aggregation must be the first of aggregation list * the customized typed aggregation can only use long as buffer type * the customized typed aggregation can only use flat type as result type This PR tries to remove these limitations. Author: Wenchen FanCloses #9599 from cloud-fan/agg. (cherry picked from commit dfcfcbcc0448ebc6f02eba6bf0495832a321c87e) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e2e84f3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e2e84f3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e2e84f3 Branch: refs/heads/branch-1.6 Commit: 6e2e84f3ea06df459c7fdda947f2c56c2869145d Parents: d2405cb Author: Wenchen Fan Authored: Tue Nov 10 11:14:25 2015 -0800 Committer: Michael Armbrust Committed: Tue Nov 10 11:14:34 2015 -0800 -- .../catalyst/encoders/ExpressionEncoder.scala | 6 +++ .../aggregate/TypedAggregateExpression.scala| 50 +-- .../spark/sql/expressions/Aggregator.scala | 5 ++ .../spark/sql/DatasetAggregatorSuite.scala | 52 4 files changed, 99 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6e2e84f3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index c287aeb..005c062 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -185,6 +185,12 @@ case class ExpressionEncoder[T]( }) } + def shift(delta: Int): ExpressionEncoder[T] = { +copy(constructExpression = constructExpression transform { + case r: BoundReference => r.copy(ordinal = r.ordinal + delta) +}) + } + /** * Returns a copy of this encoder where the expressions used to create an object given an * input row have been modified to pull the object out from a nested struct, instead of the http://git-wip-us.apache.org/repos/asf/spark/blob/6e2e84f3/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala index 24d8122..0e5bc1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala @@ -20,13 +20,13 @@ package org.apache.spark.sql.execution.aggregate import scala.language.existentials import org.apache.spark.Logging +import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.{encoderFor, Encoder} import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate -import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{StructType, DataType} +import org.apache.spark.sql.types._ object TypedAggregateExpression { def apply[A, B : Encoder, C : Encoder]( @@ -67,8 +67,11 @@ case class TypedAggregateExpression( override def nullable: Boolean = true - // TODO: this assumes flat results... - override def dataType: DataType = cEncoder.schema.head.dataType + override def dataType: DataType = if (cEncoder.flat) { +cEncoder.schema.head.dataType + } else { +cEncoder.schema + } override def deterministic: Boolean = true @@ -93,32 +96,51 @@ case class TypedAggregateExpression( case a: AttributeReference => inputMapping(a) }) - // TODO: this probably only works when we are in the first column. val bAttributes = bEncoder.schema.toAttributes lazy val boundB = bEncoder.resolve(bAttributes).bind(bAttributes) + private def updateBuffer(buffer: MutableRow, value: InternalRow):
spark git commit: [SPARK-10371][SQL] Implement subexpr elimination for UnsafeProjections
Repository: spark Updated Branches: refs/heads/master 53600854c -> 87aedc48c [SPARK-10371][SQL] Implement subexpr elimination for UnsafeProjections This patch adds the building blocks for codegening subexpr elimination and implements it end to end for UnsafeProjection. The building blocks can be used to do the same thing for other operators. It introduces some utilities to compute common sub expressions. Expressions can be added to this data structure. The expr and its children will be recursively matched against existing expressions (ones previously added) and grouped into common groups. This is built using the existing `semanticEquals`. It does not understand things like commutative or associative expressions. This can be done as future work. After building this data structure, the codegen process takes advantage of it by: 1. Generating a helper function in the generated class that computes the common subexpression. This is done for all common subexpressions that have at least two occurrences and the expression tree is sufficiently complex. 2. When generating the apply() function, if the helper function exists, call that instead of regenerating the expression tree. Repeated calls to the helper function shortcircuit the evaluation logic. Author: Nong LiAuthor: Nong Li This patch had conflicts when merged, resolved by Committer: Michael Armbrust Closes #9480 from nongli/spark-10371. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/87aedc48 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/87aedc48 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/87aedc48 Branch: refs/heads/master Commit: 87aedc48c01dffbd880e6ca84076ed47c68f88d0 Parents: 5360085 Author: Nong Li Authored: Tue Nov 10 11:28:53 2015 -0800 Committer: Michael Armbrust Committed: Tue Nov 10 11:28:53 2015 -0800 -- .../expressions/EquivalentExpressions.scala | 106 + .../sql/catalyst/expressions/Expression.scala | 50 +- .../sql/catalyst/expressions/Projection.scala | 16 ++ .../expressions/codegen/CodeGenerator.scala | 110 - .../codegen/GenerateUnsafeProjection.scala | 36 - .../catalyst/expressions/namedExpressions.scala | 4 + .../SubexpressionEliminationSuite.scala | 153 +++ .../scala/org/apache/spark/sql/SQLConf.scala| 8 + .../apache/spark/sql/execution/SparkPlan.scala | 5 + .../spark/sql/execution/basicOperators.scala| 3 +- .../org/apache/spark/sql/SQLQuerySuite.scala| 48 ++ 11 files changed, 523 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/87aedc48/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala new file mode 100644 index 000..e7380d2 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import scala.collection.mutable + +/** + * This class is used to compute equality of (sub)expression trees. Expressions can be added + * to this class and they subsequently query for expression equality. Expression trees are + * considered equal if for the same input(s), the same result is produced. + */ +class EquivalentExpressions { + /** + * Wrapper around an Expression that provides semantic equality. + */ + case class Expr(e: Expression) { +val hash = e.semanticHash() +override def equals(o: Any): Boolean = o match { + case other: Expr =>
spark git commit: [SPARK-10371][SQL] Implement subexpr elimination for UnsafeProjections
Repository: spark Updated Branches: refs/heads/branch-1.6 5ccc1eb08 -> f38509a76 [SPARK-10371][SQL] Implement subexpr elimination for UnsafeProjections This patch adds the building blocks for codegening subexpr elimination and implements it end to end for UnsafeProjection. The building blocks can be used to do the same thing for other operators. It introduces some utilities to compute common sub expressions. Expressions can be added to this data structure. The expr and its children will be recursively matched against existing expressions (ones previously added) and grouped into common groups. This is built using the existing `semanticEquals`. It does not understand things like commutative or associative expressions. This can be done as future work. After building this data structure, the codegen process takes advantage of it by: 1. Generating a helper function in the generated class that computes the common subexpression. This is done for all common subexpressions that have at least two occurrences and the expression tree is sufficiently complex. 2. When generating the apply() function, if the helper function exists, call that instead of regenerating the expression tree. Repeated calls to the helper function shortcircuit the evaluation logic. Author: Nong LiAuthor: Nong Li This patch had conflicts when merged, resolved by Committer: Michael Armbrust Closes #9480 from nongli/spark-10371. (cherry picked from commit 87aedc48c01dffbd880e6ca84076ed47c68f88d0) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f38509a7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f38509a7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f38509a7 Branch: refs/heads/branch-1.6 Commit: f38509a763816f43a224653fe65e4645894c9fc4 Parents: 5ccc1eb Author: Nong Li Authored: Tue Nov 10 11:28:53 2015 -0800 Committer: Michael Armbrust Committed: Tue Nov 10 11:29:05 2015 -0800 -- .../expressions/EquivalentExpressions.scala | 106 + .../sql/catalyst/expressions/Expression.scala | 50 +- .../sql/catalyst/expressions/Projection.scala | 16 ++ .../expressions/codegen/CodeGenerator.scala | 110 - .../codegen/GenerateUnsafeProjection.scala | 36 - .../catalyst/expressions/namedExpressions.scala | 4 + .../SubexpressionEliminationSuite.scala | 153 +++ .../scala/org/apache/spark/sql/SQLConf.scala| 8 + .../apache/spark/sql/execution/SparkPlan.scala | 5 + .../spark/sql/execution/basicOperators.scala| 3 +- .../org/apache/spark/sql/SQLQuerySuite.scala| 48 ++ 11 files changed, 523 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f38509a7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala new file mode 100644 index 000..e7380d2 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import scala.collection.mutable + +/** + * This class is used to compute equality of (sub)expression trees. Expressions can be added + * to this class and they subsequently query for expression equality. Expression trees are + * considered equal if for the same input(s), the same result is produced. + */ +class EquivalentExpressions { + /** + * Wrapper around an Expression that provides semantic equality. + */ + case class
spark git commit: [SPARK-7841][BUILD] Stop using retrieveManaged to retrieve dependencies in SBT
Repository: spark Updated Branches: refs/heads/branch-1.6 6c1409e0b -> 4dad3c51b [SPARK-7841][BUILD] Stop using retrieveManaged to retrieve dependencies in SBT This patch modifies Spark's SBT build so that it no longer uses `retrieveManaged` / `lib_managed` to store its dependencies. The motivations for this change are nicely described on the JIRA ticket ([SPARK-7841](https://issues.apache.org/jira/browse/SPARK-7841)); my personal interest in doing this stems from the fact that `lib_managed` has caused me some pain while debugging dependency issues in another PR of mine. Removing our use of `lib_managed` would be trivial except for one snag: the Datanucleus JARs, required by Spark SQL's Hive integration, cannot be included in assembly JARs due to problems with merging OSGI `plugin.xml` files. As a result, several places in the packaging and deployment pipeline assume that these Datanucleus JARs are copied to `lib_managed/jars`. In the interest of maintaining compatibility, I have chosen to retain the `lib_managed/jars` directory _only_ for these Datanucleus JARs and have added custom code to `SparkBuild.scala` to automatically copy those JARs to that folder as part of the `assembly` task. `dev/mima` also depended on `lib_managed` in a hacky way in order to set classpaths when generating MiMa excludes; I've updated this to obtain the classpaths directly from SBT instead. /cc dragos marmbrus pwendell srowen Author: Josh RosenCloses #9575 from JoshRosen/SPARK-7841. (cherry picked from commit 689386b1c60997e4505749915f7005a52c207de2) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4dad3c51 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4dad3c51 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4dad3c51 Branch: refs/heads/branch-1.6 Commit: 4dad3c51b6aa43e95e71fb440d1caa3fae10d8fc Parents: 6c1409e Author: Josh Rosen Authored: Tue Nov 10 10:14:19 2015 -0800 Committer: Michael Armbrust Committed: Tue Nov 10 10:14:28 2015 -0800 -- dev/mima | 2 +- project/SparkBuild.scala | 22 +- 2 files changed, 18 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4dad3c51/dev/mima -- diff --git a/dev/mima b/dev/mima index 2952fa6..d5baffc 100755 --- a/dev/mima +++ b/dev/mima @@ -38,7 +38,7 @@ generate_mima_ignore() { # it did not process the new classes (which are in assembly jar). generate_mima_ignore -export SPARK_CLASSPATH="`find lib_managed \( -name '*spark*jar' -a -type f \) | tr "\\n" ":"`" +export SPARK_CLASSPATH="$(build/sbt "export oldDeps/fullClasspath" | tail -n1)" echo "SPARK_CLASSPATH=$SPARK_CLASSPATH" generate_mima_ignore http://git-wip-us.apache.org/repos/asf/spark/blob/4dad3c51/project/SparkBuild.scala -- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index b75ed13..a9fb741 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -16,6 +16,7 @@ */ import java.io._ +import java.nio.file.Files import scala.util.Properties import scala.collection.JavaConverters._ @@ -135,8 +136,6 @@ object SparkBuild extends PomBuild { .orElse(sys.props.get("java.home").map { p => new File(p).getParentFile().getAbsolutePath() }) .map(file), incOptions := incOptions.value.withNameHashing(true), -retrieveManaged := true, -retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]", publishMavenStyle := true, unidocGenjavadocVersion := "0.9-spark0", @@ -326,8 +325,6 @@ object OldDeps { def oldDepsSettings() = Defaults.coreDefaultSettings ++ Seq( name := "old-deps", scalaVersion := "2.10.5", -retrieveManaged := true, -retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]", libraryDependencies := Seq("spark-streaming-mqtt", "spark-streaming-zeromq", "spark-streaming-flume", "spark-streaming-kafka", "spark-streaming-twitter", "spark-streaming", "spark-mllib", "spark-bagel", "spark-graphx", @@ -404,6 +401,8 @@ object Assembly { val hadoopVersion = taskKey[String]("The version of hadoop that spark is compiled against.") + val deployDatanucleusJars = taskKey[Unit]("Deploy datanucleus jars to the spark/lib_managed/jars directory") + lazy val settings = assemblySettings ++ Seq( test in assembly := {}, hadoopVersion := { @@ -429,7 +428,20 @@ object Assembly { case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines
spark git commit: [SPARK-11382] Replace example code in mllib-decision-tree.md using include_example
Repository: spark Updated Branches: refs/heads/branch-1.6 39c1ebcd3 -> 6c1409e0b [SPARK-11382] Replace example code in mllib-decision-tree.md using include_example https://issues.apache.org/jira/browse/SPARK-11382 B.T.W. I fix an error in naive_bayes_example.py. Author: Xusen YinCloses #9596 from yinxusen/SPARK-11382. (cherry picked from commit a81f47ff7498e7063c855ccf75bba81ab101b43e) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c1409e0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c1409e0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c1409e0 Branch: refs/heads/branch-1.6 Commit: 6c1409e0b57cb0202c434a9f6c2593e5ae241e70 Parents: 39c1ebc Author: Xusen Yin Authored: Tue Nov 10 10:05:53 2015 -0800 Committer: Xiangrui Meng Committed: Tue Nov 10 10:06:02 2015 -0800 -- docs/mllib-decision-tree.md | 253 +-- .../JavaDecisionTreeClassificationExample.java | 91 +++ .../JavaDecisionTreeRegressionExample.java | 96 +++ .../decision_tree_classification_example.py | 55 .../mllib/decision_tree_regression_example.py | 56 .../main/python/mllib/naive_bayes_example.py| 1 + .../DecisionTreeClassificationExample.scala | 67 + .../mllib/DecisionTreeRegressionExample.scala | 66 + 8 files changed, 438 insertions(+), 247 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6c1409e0/docs/mllib-decision-tree.md -- diff --git a/docs/mllib-decision-tree.md b/docs/mllib-decision-tree.md index b5b454b..77ce34e 100644 --- a/docs/mllib-decision-tree.md +++ b/docs/mllib-decision-tree.md @@ -194,137 +194,19 @@ maximum tree depth of 5. The test error is calculated to measure the algorithm a Refer to the [`DecisionTree` Scala docs](api/scala/index.html#org.apache.spark.mllib.tree.DecisionTree) and [`DecisionTreeModel` Scala docs](api/scala/index.html#org.apache.spark.mllib.tree.model.DecisionTreeModel) for details on the API. -{% highlight scala %} -import org.apache.spark.mllib.tree.DecisionTree -import org.apache.spark.mllib.tree.model.DecisionTreeModel -import org.apache.spark.mllib.util.MLUtils - -// Load and parse the data file. -val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") -// Split the data into training and test sets (30% held out for testing) -val splits = data.randomSplit(Array(0.7, 0.3)) -val (trainingData, testData) = (splits(0), splits(1)) - -// Train a DecisionTree model. -// Empty categoricalFeaturesInfo indicates all features are continuous. -val numClasses = 2 -val categoricalFeaturesInfo = Map[Int, Int]() -val impurity = "gini" -val maxDepth = 5 -val maxBins = 32 - -val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo, - impurity, maxDepth, maxBins) - -// Evaluate model on test instances and compute test error -val labelAndPreds = testData.map { point => - val prediction = model.predict(point.features) - (point.label, prediction) -} -val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count() -println("Test Error = " + testErr) -println("Learned classification tree model:\n" + model.toDebugString) - -// Save and load model -model.save(sc, "myModelPath") -val sameModel = DecisionTreeModel.load(sc, "myModelPath") -{% endhighlight %} +{% include_example scala/org/apache/spark/examples/mllib/DecisionTreeClassificationExample.scala %} Refer to the [`DecisionTree` Java docs](api/java/org/apache/spark/mllib/tree/DecisionTree.html) and [`DecisionTreeModel` Java docs](api/java/org/apache/spark/mllib/tree/model/DecisionTreeModel.html) for details on the API. -{% highlight java %} -import java.util.HashMap; -import scala.Tuple2; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.mllib.regression.LabeledPoint; -import org.apache.spark.mllib.tree.DecisionTree; -import org.apache.spark.mllib.tree.model.DecisionTreeModel; -import org.apache.spark.mllib.util.MLUtils; -import org.apache.spark.SparkConf; - -SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTree"); -JavaSparkContext sc = new JavaSparkContext(sparkConf); - -// Load and parse the data file. -String datapath = "data/mllib/sample_libsvm_data.txt"; -JavaRDD data = MLUtils.loadLibSVMFile(sc.sc(), datapath).toJavaRDD(); -// Split the data into training and test sets (30% held out for
spark git commit: [SPARK-11590][SQL] use native json_tuple in lateral view
Repository: spark Updated Branches: refs/heads/branch-1.6 6e2e84f3e -> 5ccc1eb08 [SPARK-11590][SQL] use native json_tuple in lateral view Author: Wenchen FanCloses #9562 from cloud-fan/json-tuple. (cherry picked from commit 53600854c270d4c953fe95fbae528740b5cf6603) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ccc1eb0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ccc1eb0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ccc1eb0 Branch: refs/heads/branch-1.6 Commit: 5ccc1eb08c14291bb1e94b1cd9fa3bff1172529d Parents: 6e2e84f Author: Wenchen Fan Authored: Tue Nov 10 11:21:31 2015 -0800 Committer: Yin Huai Committed: Tue Nov 10 11:22:11 2015 -0800 -- .../catalyst/expressions/jsonExpressions.scala | 23 +-- .../expressions/JsonExpressionsSuite.scala | 30 +++ .../scala/org/apache/spark/sql/DataFrame.scala | 8 +++-- .../scala/org/apache/spark/sql/functions.scala | 12 .../apache/spark/sql/JsonFunctionsSuite.scala | 23 --- .../org/apache/spark/sql/hive/HiveQl.scala | 4 +++ .../org/apache/spark/sql/hive/HiveQlSuite.scala | 13 .../sql/hive/execution/SQLQuerySuite.scala | 31 8 files changed, 104 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5ccc1eb0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 8c9853e..8cd7323 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -314,7 +314,7 @@ case class GetJsonObject(json: Expression, path: Expression) } case class JsonTuple(children: Seq[Expression]) - extends Expression with CodegenFallback { + extends Generator with CodegenFallback { import SharedFactory._ @@ -324,8 +324,8 @@ case class JsonTuple(children: Seq[Expression]) } // if processing fails this shared value will be returned - @transient private lazy val nullRow: InternalRow = -new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length)) + @transient private lazy val nullRow: Seq[InternalRow] = +new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length)) :: Nil // the json body is the first child @transient private lazy val jsonExpr: Expression = children.head @@ -344,15 +344,8 @@ case class JsonTuple(children: Seq[Expression]) // and count the number of foldable fields, we'll use this later to optimize evaluation @transient private lazy val constantFields: Int = foldableFieldNames.count(_ != null) - override lazy val dataType: StructType = { -val fields = fieldExpressions.zipWithIndex.map { - case (_, idx) => StructField( -name = s"c$idx", // mirroring GenericUDTFJSONTuple.initialize -dataType = StringType, -nullable = true) -} - -StructType(fields) + override def elementTypes: Seq[(DataType, Boolean, String)] = fieldExpressions.zipWithIndex.map { +case (_, idx) => (StringType, true, s"c$idx") } override def prettyName: String = "json_tuple" @@ -367,7 +360,7 @@ case class JsonTuple(children: Seq[Expression]) } } - override def eval(input: InternalRow): InternalRow = { + override def eval(input: InternalRow): TraversableOnce[InternalRow] = { val json = jsonExpr.eval(input).asInstanceOf[UTF8String] if (json == null) { return nullRow @@ -383,7 +376,7 @@ case class JsonTuple(children: Seq[Expression]) } } - private def parseRow(parser: JsonParser, input: InternalRow): InternalRow = { + private def parseRow(parser: JsonParser, input: InternalRow): Seq[InternalRow] = { // only objects are supported if (parser.nextToken() != JsonToken.START_OBJECT) { return nullRow @@ -433,7 +426,7 @@ case class JsonTuple(children: Seq[Expression]) parser.skipChildren() } -new GenericInternalRow(row) +new GenericInternalRow(row) :: Nil } private def copyCurrentStructure(generator: JsonGenerator, parser: JsonParser): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/5ccc1eb0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala -- diff --git
spark git commit: [SPARK-11590][SQL] use native json_tuple in lateral view
Repository: spark Updated Branches: refs/heads/master dfcfcbcc0 -> 53600854c [SPARK-11590][SQL] use native json_tuple in lateral view Author: Wenchen FanCloses #9562 from cloud-fan/json-tuple. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53600854 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53600854 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53600854 Branch: refs/heads/master Commit: 53600854c270d4c953fe95fbae528740b5cf6603 Parents: dfcfcbc Author: Wenchen Fan Authored: Tue Nov 10 11:21:31 2015 -0800 Committer: Yin Huai Committed: Tue Nov 10 11:21:31 2015 -0800 -- .../catalyst/expressions/jsonExpressions.scala | 23 +-- .../expressions/JsonExpressionsSuite.scala | 30 +++ .../scala/org/apache/spark/sql/DataFrame.scala | 8 +++-- .../scala/org/apache/spark/sql/functions.scala | 12 .../apache/spark/sql/JsonFunctionsSuite.scala | 23 --- .../org/apache/spark/sql/hive/HiveQl.scala | 4 +++ .../org/apache/spark/sql/hive/HiveQlSuite.scala | 13 .../sql/hive/execution/SQLQuerySuite.scala | 31 8 files changed, 104 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/53600854/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 8c9853e..8cd7323 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -314,7 +314,7 @@ case class GetJsonObject(json: Expression, path: Expression) } case class JsonTuple(children: Seq[Expression]) - extends Expression with CodegenFallback { + extends Generator with CodegenFallback { import SharedFactory._ @@ -324,8 +324,8 @@ case class JsonTuple(children: Seq[Expression]) } // if processing fails this shared value will be returned - @transient private lazy val nullRow: InternalRow = -new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length)) + @transient private lazy val nullRow: Seq[InternalRow] = +new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length)) :: Nil // the json body is the first child @transient private lazy val jsonExpr: Expression = children.head @@ -344,15 +344,8 @@ case class JsonTuple(children: Seq[Expression]) // and count the number of foldable fields, we'll use this later to optimize evaluation @transient private lazy val constantFields: Int = foldableFieldNames.count(_ != null) - override lazy val dataType: StructType = { -val fields = fieldExpressions.zipWithIndex.map { - case (_, idx) => StructField( -name = s"c$idx", // mirroring GenericUDTFJSONTuple.initialize -dataType = StringType, -nullable = true) -} - -StructType(fields) + override def elementTypes: Seq[(DataType, Boolean, String)] = fieldExpressions.zipWithIndex.map { +case (_, idx) => (StringType, true, s"c$idx") } override def prettyName: String = "json_tuple" @@ -367,7 +360,7 @@ case class JsonTuple(children: Seq[Expression]) } } - override def eval(input: InternalRow): InternalRow = { + override def eval(input: InternalRow): TraversableOnce[InternalRow] = { val json = jsonExpr.eval(input).asInstanceOf[UTF8String] if (json == null) { return nullRow @@ -383,7 +376,7 @@ case class JsonTuple(children: Seq[Expression]) } } - private def parseRow(parser: JsonParser, input: InternalRow): InternalRow = { + private def parseRow(parser: JsonParser, input: InternalRow): Seq[InternalRow] = { // only objects are supported if (parser.nextToken() != JsonToken.START_OBJECT) { return nullRow @@ -433,7 +426,7 @@ case class JsonTuple(children: Seq[Expression]) parser.skipChildren() } -new GenericInternalRow(row) +new GenericInternalRow(row) :: Nil } private def copyCurrentStructure(generator: JsonGenerator, parser: JsonParser): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/53600854/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
spark git commit: [SPARK-11252][NETWORK] ShuffleClient should release connection after fetching blocks had been completed for external shuffle
Repository: spark Updated Branches: refs/heads/master 689386b1c -> 6e5fc3788 [SPARK-11252][NETWORK] ShuffleClient should release connection after fetching blocks had been completed for external shuffle with yarn's external shuffle, ExternalShuffleClient of executors reserve its connections for yarn's NodeManager until application has been completed. so it will make NodeManager and executors have many socket connections. in order to reduce network pressure of NodeManager's shuffleService, after registerWithShuffleServer or fetchBlocks have been completed in ExternalShuffleClient, connection for NM's shuffleService needs to be closed.andrewor14 rxin vanzin Author: Lianhui WangCloses #9227 from lianhuiwang/spark-11252. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e5fc378 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e5fc378 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e5fc378 Branch: refs/heads/master Commit: 6e5fc37883ed81c3ee2338145a48de3036d19399 Parents: 689386b Author: Lianhui Wang Authored: Tue Nov 10 10:40:08 2015 -0800 Committer: Marcelo Vanzin Committed: Tue Nov 10 10:40:08 2015 -0800 -- .../spark/deploy/ExternalShuffleService.scala | 3 +- .../apache/spark/network/TransportContext.java | 11 ++- .../network/client/TransportClientFactory.java | 10 ++ .../network/server/TransportChannelHandler.java | 26 +-- .../network/TransportClientFactorySuite.java| 34 .../network/shuffle/ExternalShuffleClient.java | 12 --- 6 files changed, 81 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6e5fc378/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index 6840a3a..a039d54 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -47,7 +47,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0) private val blockHandler = newShuffleBlockHandler(transportConf) - private val transportContext: TransportContext = new TransportContext(transportConf, blockHandler) + private val transportContext: TransportContext = +new TransportContext(transportConf, blockHandler, true) private var server: TransportServer = _ http://git-wip-us.apache.org/repos/asf/spark/blob/6e5fc378/network/common/src/main/java/org/apache/spark/network/TransportContext.java -- diff --git a/network/common/src/main/java/org/apache/spark/network/TransportContext.java b/network/common/src/main/java/org/apache/spark/network/TransportContext.java index 43900e6..1b64b86 100644 --- a/network/common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/network/common/src/main/java/org/apache/spark/network/TransportContext.java @@ -59,15 +59,24 @@ public class TransportContext { private final TransportConf conf; private final RpcHandler rpcHandler; + private final boolean closeIdleConnections; private final MessageEncoder encoder; private final MessageDecoder decoder; public TransportContext(TransportConf conf, RpcHandler rpcHandler) { +this(conf, rpcHandler, false); + } + + public TransportContext( + TransportConf conf, + RpcHandler rpcHandler, + boolean closeIdleConnections) { this.conf = conf; this.rpcHandler = rpcHandler; this.encoder = new MessageEncoder(); this.decoder = new MessageDecoder(); +this.closeIdleConnections = closeIdleConnections; } /** @@ -144,7 +153,7 @@ public class TransportContext { TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client, rpcHandler); return new TransportChannelHandler(client, responseHandler, requestHandler, - conf.connectionTimeoutMs()); + conf.connectionTimeoutMs(), closeIdleConnections); } public TransportConf getConf() { return conf; } http://git-wip-us.apache.org/repos/asf/spark/blob/6e5fc378/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java -- diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
[3/4] spark git commit: [SPARK-9830][SQL] Remove AggregateExpression1 and Aggregate Operator used to evaluate AggregateExpression1s
http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala deleted file mode 100644 index 9b22ce2..000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala +++ /dev/null @@ -1,467 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.expressions.aggregate - -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst._ -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.{Expand, Aggregate, LogicalPlan} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.types._ - -/** - * Utility functions used by the query planner to convert our plan to new aggregation code path. - */ -object Utils { - - // Check if the DataType given cannot be part of a group by clause. - private def isUnGroupable(dt: DataType): Boolean = dt match { -case _: ArrayType | _: MapType => true -case s: StructType => s.fields.exists(f => isUnGroupable(f.dataType)) -case _ => false - } - - // Right now, we do not support complex types in the grouping key schema. - private def supportsGroupingKeySchema(aggregate: Aggregate): Boolean = -!aggregate.groupingExpressions.exists(e => isUnGroupable(e.dataType)) - - private def doConvert(plan: LogicalPlan): Option[Aggregate] = plan match { -case p: Aggregate if supportsGroupingKeySchema(p) => - - val converted = MultipleDistinctRewriter.rewrite(p.transformExpressionsDown { -case expressions.Average(child) => - aggregate.AggregateExpression2( -aggregateFunction = aggregate.Average(child), -mode = aggregate.Complete, -isDistinct = false) - -case expressions.Count(child) => - aggregate.AggregateExpression2( -aggregateFunction = aggregate.Count(child), -mode = aggregate.Complete, -isDistinct = false) - -case expressions.CountDistinct(children) => - val child = if (children.size > 1) { -DropAnyNull(CreateStruct(children)) - } else { -children.head - } - aggregate.AggregateExpression2( -aggregateFunction = aggregate.Count(child), -mode = aggregate.Complete, -isDistinct = true) - -case expressions.First(child, ignoreNulls) => - aggregate.AggregateExpression2( -aggregateFunction = aggregate.First(child, ignoreNulls), -mode = aggregate.Complete, -isDistinct = false) - -case expressions.Kurtosis(child) => - aggregate.AggregateExpression2( -aggregateFunction = aggregate.Kurtosis(child), -mode = aggregate.Complete, -isDistinct = false) - -case expressions.Last(child, ignoreNulls) => - aggregate.AggregateExpression2( -aggregateFunction = aggregate.Last(child, ignoreNulls), -mode = aggregate.Complete, -isDistinct = false) - -case expressions.Max(child) => - aggregate.AggregateExpression2( -aggregateFunction = aggregate.Max(child), -mode = aggregate.Complete, -isDistinct = false) - -case expressions.Min(child) => - aggregate.AggregateExpression2( -aggregateFunction = aggregate.Min(child), -mode = aggregate.Complete, -isDistinct = false) - -case expressions.Skewness(child) => - aggregate.AggregateExpression2( -aggregateFunction = aggregate.Skewness(child), -mode = aggregate.Complete, -isDistinct = false) - -case expressions.StddevPop(child) => - aggregate.AggregateExpression2( -aggregateFunction = aggregate.StddevPop(child), -
[2/4] spark git commit: [SPARK-9830][SQL] Remove AggregateExpression1 and Aggregate Operator used to evaluate AggregateExpression1s
http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index ed810a1..0290faf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -68,7 +68,7 @@ class FilterPushdownSuite extends PlanTest { test("column pruning for group") { val originalQuery = testRelation -.groupBy('a)('a, Count('b)) +.groupBy('a)('a, count('b)) .select('a) val optimized = Optimize.execute(originalQuery.analyze) @@ -84,7 +84,7 @@ class FilterPushdownSuite extends PlanTest { test("column pruning for group with alias") { val originalQuery = testRelation -.groupBy('a)('a as 'c, Count('b)) +.groupBy('a)('a as 'c, count('b)) .select('c) val optimized = Optimize.execute(originalQuery.analyze) @@ -656,7 +656,7 @@ class FilterPushdownSuite extends PlanTest { test("aggregate: push down filter when filter on group by expression") { val originalQuery = testRelation -.groupBy('a)('a, Count('b) as 'c) +.groupBy('a)('a, count('b) as 'c) .select('a, 'c) .where('a === 2) @@ -664,7 +664,7 @@ class FilterPushdownSuite extends PlanTest { val correctAnswer = testRelation .where('a === 2) -.groupBy('a)('a, Count('b) as 'c) +.groupBy('a)('a, count('b) as 'c) .analyze comparePlans(optimized, correctAnswer) } @@ -672,7 +672,7 @@ class FilterPushdownSuite extends PlanTest { test("aggregate: don't push down filter when filter not on group by expression") { val originalQuery = testRelation .select('a, 'b) -.groupBy('a)('a, Count('b) as 'c) +.groupBy('a)('a, count('b) as 'c) .where('c === 2L) val optimized = Optimize.execute(originalQuery.analyze) @@ -683,7 +683,7 @@ class FilterPushdownSuite extends PlanTest { test("aggregate: push down filters partially which are subset of group by expressions") { val originalQuery = testRelation .select('a, 'b) -.groupBy('a)('a, Count('b) as 'c) +.groupBy('a)('a, count('b) as 'c) .where('c === 2L && 'a === 3) val optimized = Optimize.execute(originalQuery.analyze) @@ -691,7 +691,7 @@ class FilterPushdownSuite extends PlanTest { val correctAnswer = testRelation .select('a, 'b) .where('a === 3) -.groupBy('a)('a, Count('b) as 'c) +.groupBy('a)('a, count('b) as 'c) .where('c === 2L) .analyze http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index d25807c..3b69247 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -34,6 +34,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.encoders.Encoder import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} @@ -1338,7 +1339,7 @@ class DataFrame private[sql]( if (groupColExprIds.contains(attr.exprId)) { attr } else { -Alias(First(attr), attr.name)() +Alias(new First(attr).toAggregateExpression(), attr.name)() } } Aggregate(groupCols, aggCols, logicalPlan) @@ -1381,11 +1382,11 @@ class DataFrame private[sql]( // The list of summary statistics to compute, in the form of expressions. val statistics = List[(String, Expression => Expression)]( - "count" -> Count, - "mean" -> Average, - "stddev" -> StddevSamp, - "min" -> Min, - "max" -> Max) + "count" -> ((child: Expression) => Count(child).toAggregateExpression()), + "mean" ->
[3/4] spark git commit: [SPARK-9830][SQL] Remove AggregateExpression1 and Aggregate Operator used to evaluate AggregateExpression1s
http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala deleted file mode 100644 index 9b22ce2..000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala +++ /dev/null @@ -1,467 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.expressions.aggregate - -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst._ -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.{Expand, Aggregate, LogicalPlan} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.types._ - -/** - * Utility functions used by the query planner to convert our plan to new aggregation code path. - */ -object Utils { - - // Check if the DataType given cannot be part of a group by clause. - private def isUnGroupable(dt: DataType): Boolean = dt match { -case _: ArrayType | _: MapType => true -case s: StructType => s.fields.exists(f => isUnGroupable(f.dataType)) -case _ => false - } - - // Right now, we do not support complex types in the grouping key schema. - private def supportsGroupingKeySchema(aggregate: Aggregate): Boolean = -!aggregate.groupingExpressions.exists(e => isUnGroupable(e.dataType)) - - private def doConvert(plan: LogicalPlan): Option[Aggregate] = plan match { -case p: Aggregate if supportsGroupingKeySchema(p) => - - val converted = MultipleDistinctRewriter.rewrite(p.transformExpressionsDown { -case expressions.Average(child) => - aggregate.AggregateExpression2( -aggregateFunction = aggregate.Average(child), -mode = aggregate.Complete, -isDistinct = false) - -case expressions.Count(child) => - aggregate.AggregateExpression2( -aggregateFunction = aggregate.Count(child), -mode = aggregate.Complete, -isDistinct = false) - -case expressions.CountDistinct(children) => - val child = if (children.size > 1) { -DropAnyNull(CreateStruct(children)) - } else { -children.head - } - aggregate.AggregateExpression2( -aggregateFunction = aggregate.Count(child), -mode = aggregate.Complete, -isDistinct = true) - -case expressions.First(child, ignoreNulls) => - aggregate.AggregateExpression2( -aggregateFunction = aggregate.First(child, ignoreNulls), -mode = aggregate.Complete, -isDistinct = false) - -case expressions.Kurtosis(child) => - aggregate.AggregateExpression2( -aggregateFunction = aggregate.Kurtosis(child), -mode = aggregate.Complete, -isDistinct = false) - -case expressions.Last(child, ignoreNulls) => - aggregate.AggregateExpression2( -aggregateFunction = aggregate.Last(child, ignoreNulls), -mode = aggregate.Complete, -isDistinct = false) - -case expressions.Max(child) => - aggregate.AggregateExpression2( -aggregateFunction = aggregate.Max(child), -mode = aggregate.Complete, -isDistinct = false) - -case expressions.Min(child) => - aggregate.AggregateExpression2( -aggregateFunction = aggregate.Min(child), -mode = aggregate.Complete, -isDistinct = false) - -case expressions.Skewness(child) => - aggregate.AggregateExpression2( -aggregateFunction = aggregate.Skewness(child), -mode = aggregate.Complete, -isDistinct = false) - -case expressions.StddevPop(child) => - aggregate.AggregateExpression2( -aggregateFunction = aggregate.StddevPop(child), -
[2/4] spark git commit: [SPARK-9830][SQL] Remove AggregateExpression1 and Aggregate Operator used to evaluate AggregateExpression1s
http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index ed810a1..0290faf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -68,7 +68,7 @@ class FilterPushdownSuite extends PlanTest { test("column pruning for group") { val originalQuery = testRelation -.groupBy('a)('a, Count('b)) +.groupBy('a)('a, count('b)) .select('a) val optimized = Optimize.execute(originalQuery.analyze) @@ -84,7 +84,7 @@ class FilterPushdownSuite extends PlanTest { test("column pruning for group with alias") { val originalQuery = testRelation -.groupBy('a)('a as 'c, Count('b)) +.groupBy('a)('a as 'c, count('b)) .select('c) val optimized = Optimize.execute(originalQuery.analyze) @@ -656,7 +656,7 @@ class FilterPushdownSuite extends PlanTest { test("aggregate: push down filter when filter on group by expression") { val originalQuery = testRelation -.groupBy('a)('a, Count('b) as 'c) +.groupBy('a)('a, count('b) as 'c) .select('a, 'c) .where('a === 2) @@ -664,7 +664,7 @@ class FilterPushdownSuite extends PlanTest { val correctAnswer = testRelation .where('a === 2) -.groupBy('a)('a, Count('b) as 'c) +.groupBy('a)('a, count('b) as 'c) .analyze comparePlans(optimized, correctAnswer) } @@ -672,7 +672,7 @@ class FilterPushdownSuite extends PlanTest { test("aggregate: don't push down filter when filter not on group by expression") { val originalQuery = testRelation .select('a, 'b) -.groupBy('a)('a, Count('b) as 'c) +.groupBy('a)('a, count('b) as 'c) .where('c === 2L) val optimized = Optimize.execute(originalQuery.analyze) @@ -683,7 +683,7 @@ class FilterPushdownSuite extends PlanTest { test("aggregate: push down filters partially which are subset of group by expressions") { val originalQuery = testRelation .select('a, 'b) -.groupBy('a)('a, Count('b) as 'c) +.groupBy('a)('a, count('b) as 'c) .where('c === 2L && 'a === 3) val optimized = Optimize.execute(originalQuery.analyze) @@ -691,7 +691,7 @@ class FilterPushdownSuite extends PlanTest { val correctAnswer = testRelation .select('a, 'b) .where('a === 3) -.groupBy('a)('a, Count('b) as 'c) +.groupBy('a)('a, count('b) as 'c) .where('c === 2L) .analyze http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index d25807c..3b69247 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -34,6 +34,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.encoders.Encoder import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} @@ -1338,7 +1339,7 @@ class DataFrame private[sql]( if (groupColExprIds.contains(attr.exprId)) { attr } else { -Alias(First(attr), attr.name)() +Alias(new First(attr).toAggregateExpression(), attr.name)() } } Aggregate(groupCols, aggCols, logicalPlan) @@ -1381,11 +1382,11 @@ class DataFrame private[sql]( // The list of summary statistics to compute, in the form of expressions. val statistics = List[(String, Expression => Expression)]( - "count" -> Count, - "mean" -> Average, - "stddev" -> StddevSamp, - "min" -> Min, - "max" -> Max) + "count" -> ((child: Expression) => Count(child).toAggregateExpression()), + "mean" ->
[1/4] spark git commit: [SPARK-9830][SQL] Remove AggregateExpression1 and Aggregate Operator used to evaluate AggregateExpression1s
Repository: spark Updated Branches: refs/heads/master 6e5fc3788 -> e0701c756 http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index ab88c1e..6f8ed41 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -38,6 +38,7 @@ import org.apache.spark.Logging import org.apache.spark.sql.{AnalysisException, catalyst} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.{logical, _} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.CurrentOrigin @@ -1508,9 +1509,10 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C UnresolvedStar(Some(UnresolvedAttribute.parseAttributeName(name))) /* Aggregate Functions */ -case Token("TOK_FUNCTIONSTAR", Token(COUNT(), Nil) :: Nil) => Count(Literal(1)) -case Token("TOK_FUNCTIONDI", Token(COUNT(), Nil) :: args) => CountDistinct(args.map(nodeToExpr)) -case Token("TOK_FUNCTIONDI", Token(SUM(), Nil) :: arg :: Nil) => SumDistinct(nodeToExpr(arg)) +case Token("TOK_FUNCTIONDI", Token(COUNT(), Nil) :: args) => + Count(args.map(nodeToExpr)).toAggregateExpression(isDistinct = true) +case Token("TOK_FUNCTIONSTAR", Token(COUNT(), Nil) :: Nil) => + Count(Literal(1)).toAggregateExpression() /* Casts */ case Token("TOK_FUNCTION", Token("TOK_STRING", Nil) :: arg :: Nil) => http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index ea36c13..6bf2c53 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -69,11 +69,7 @@ class ScalaAggregateFunction(schema: StructType) extends UserDefinedAggregateFun abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { import testImplicits._ - var originalUseAggregate2: Boolean = _ - override def beforeAll(): Unit = { -originalUseAggregate2 = sqlContext.conf.useSqlAggregate2 -sqlContext.setConf(SQLConf.USE_SQL_AGGREGATE2.key, "true") val data1 = Seq[(Integer, Integer)]( (1, 10), (null, -60), @@ -120,7 +116,6 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te sqlContext.sql("DROP TABLE IF EXISTS agg1") sqlContext.sql("DROP TABLE IF EXISTS agg2") sqlContext.dropTempTable("emptyTable") -sqlContext.setConf(SQLConf.USE_SQL_AGGREGATE2.key, originalUseAggregate2.toString) } test("empty table") { @@ -447,73 +442,80 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te } test("single distinct column set") { -// DISTINCT is not meaningful with Max and Min, so we just ignore the DISTINCT keyword. -checkAnswer( - sqlContext.sql( -""" - |SELECT - | min(distinct value1), - | sum(distinct value1), - | avg(value1), - | avg(value2), - | max(distinct value1) - |FROM agg2 -""".stripMargin), - Row(-60, 70.0, 101.0/9.0, 5.6, 100)) - -checkAnswer( - sqlContext.sql( -""" - |SELECT - | mydoubleavg(distinct value1), - | avg(value1), - | avg(value2), - | key, - | mydoubleavg(value1 - 1), - | mydoubleavg(distinct value1) * 0.1, - | avg(value1 + value2) - |FROM agg2 - |GROUP BY key -""".stripMargin), - Row(120.0, 70.0/3.0, -10.0/3.0, 1, 67.0/3.0 + 100.0, 12.0, 20.0) :: -Row(100.0, 1.0/3.0, 1.0, 2, -2.0/3.0 + 100.0, 10.0, 2.0) :: -Row(null, null, 3.0, 3, null, null, null) :: -Row(110.0, 10.0, 20.0, null, 109.0, 11.0, 30.0) :: Nil) - -checkAnswer( - sqlContext.sql( -""" - |SELECT - | key, - | mydoubleavg(distinct value1), - | mydoublesum(value2), - | mydoublesum(distinct value1), - | mydoubleavg(distinct value1), - | mydoubleavg(value1) - |FROM agg2 - |GROUP BY key -""".stripMargin), - Row(1,
[4/4] spark git commit: [SPARK-9830][SQL] Remove AggregateExpression1 and Aggregate Operator used to evaluate AggregateExpression1s
[SPARK-9830][SQL] Remove AggregateExpression1 and Aggregate Operator used to evaluate AggregateExpression1s https://issues.apache.org/jira/browse/SPARK-9830 This PR contains the following main changes. * Removing `AggregateExpression1`. * Removing `Aggregate` operator, which is used to evaluate `AggregateExpression1`. * Removing planner rule used to plan `Aggregate`. * Linking `MultipleDistinctRewriter` to analyzer. * Renaming `AggregateExpression2` to `AggregateExpression` and `AggregateFunction2` to `AggregateFunction`. * Updating places where we create aggregate expression. The way to create aggregate expressions is `AggregateExpression(aggregateFunction, mode, isDistinct)`. * Changing `val`s in `DeclarativeAggregate`s that touch children of this function to `lazy val`s (when we create aggregate expression in DataFrame API, children of an aggregate function can be unresolved). Author: Yin HuaiCloses #9556 from yhuai/removeAgg1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0701c75 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0701c75 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0701c75 Branch: refs/heads/master Commit: e0701c75601c43f69ed27fc7c252321703db51f2 Parents: 6e5fc37 Author: Yin Huai Authored: Tue Nov 10 11:06:29 2015 -0800 Committer: Michael Armbrust Committed: Tue Nov 10 11:06:29 2015 -0800 -- R/pkg/R/functions.R |2 +- python/pyspark/sql/dataframe.py |2 +- python/pyspark/sql/functions.py |2 +- python/pyspark/sql/tests.py |2 +- .../spark/sql/catalyst/CatalystConf.scala | 10 +- .../apache/spark/sql/catalyst/SqlParser.scala | 14 +- .../spark/sql/catalyst/analysis/Analyzer.scala | 26 +- .../sql/catalyst/analysis/CheckAnalysis.scala | 46 +- .../analysis/DistinctAggregationRewriter.scala | 278 + .../catalyst/analysis/FunctionRegistry.scala|2 + .../catalyst/analysis/HiveTypeCoercion.scala| 20 +- .../sql/catalyst/analysis/unresolved.scala |4 + .../apache/spark/sql/catalyst/dsl/package.scala | 22 +- .../expressions/aggregate/Average.scala | 31 +- .../aggregate/CentralMomentAgg.scala| 13 +- .../catalyst/expressions/aggregate/Corr.scala | 15 + .../catalyst/expressions/aggregate/Count.scala | 28 +- .../catalyst/expressions/aggregate/First.scala | 14 +- .../aggregate/HyperLogLogPlusPlus.scala | 17 + .../expressions/aggregate/Kurtosis.scala|2 + .../catalyst/expressions/aggregate/Last.scala | 12 +- .../catalyst/expressions/aggregate/Max.scala| 17 +- .../catalyst/expressions/aggregate/Min.scala| 17 +- .../expressions/aggregate/Skewness.scala|2 + .../catalyst/expressions/aggregate/Stddev.scala | 31 +- .../catalyst/expressions/aggregate/Sum.scala| 29 +- .../catalyst/expressions/aggregate/Utils.scala | 467 .../expressions/aggregate/Variance.scala|7 +- .../expressions/aggregate/interfaces.scala | 57 +- .../sql/catalyst/expressions/aggregates.scala | 1073 -- .../sql/catalyst/optimizer/Optimizer.scala | 23 +- .../spark/sql/catalyst/planning/patterns.scala | 74 -- .../spark/sql/catalyst/plans/QueryPlan.scala| 12 +- .../catalyst/plans/logical/basicOperators.scala |4 +- .../catalyst/analysis/AnalysisErrorSuite.scala | 23 +- .../sql/catalyst/analysis/AnalysisSuite.scala |2 +- .../analysis/DecimalPrecisionSuite.scala|1 + .../analysis/ExpressionTypeCheckingSuite.scala |6 +- .../optimizer/ConstantFoldingSuite.scala|4 +- .../optimizer/FilterPushdownSuite.scala | 14 +- .../scala/org/apache/spark/sql/DataFrame.scala | 13 +- .../org/apache/spark/sql/GroupedData.scala | 45 +- .../scala/org/apache/spark/sql/SQLConf.scala| 20 +- .../apache/spark/sql/execution/Aggregate.scala | 205 .../org/apache/spark/sql/execution/Expand.scala |3 + .../spark/sql/execution/SparkPlanner.scala |1 - .../spark/sql/execution/SparkStrategies.scala | 238 ++-- .../aggregate/AggregationIterator.scala | 28 +- .../aggregate/SortBasedAggregate.scala |4 +- .../SortBasedAggregationIterator.scala |8 +- .../execution/aggregate/TungstenAggregate.scala |6 +- .../aggregate/TungstenAggregationIterator.scala | 36 +- .../spark/sql/execution/aggregate/udaf.scala|2 +- .../spark/sql/execution/aggregate/utils.scala | 20 +- .../spark/sql/expressions/Aggregator.scala |5 +- .../spark/sql/expressions/WindowSpec.scala | 82 +- .../org/apache/spark/sql/expressions/udaf.scala |6 +-
[1/4] spark git commit: [SPARK-9830][SQL] Remove AggregateExpression1 and Aggregate Operator used to evaluate AggregateExpression1s
Repository: spark Updated Branches: refs/heads/branch-1.6 825e971d0 -> 7c4ade0d7 http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index ab88c1e..6f8ed41 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -38,6 +38,7 @@ import org.apache.spark.Logging import org.apache.spark.sql.{AnalysisException, catalyst} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.{logical, _} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.CurrentOrigin @@ -1508,9 +1509,10 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C UnresolvedStar(Some(UnresolvedAttribute.parseAttributeName(name))) /* Aggregate Functions */ -case Token("TOK_FUNCTIONSTAR", Token(COUNT(), Nil) :: Nil) => Count(Literal(1)) -case Token("TOK_FUNCTIONDI", Token(COUNT(), Nil) :: args) => CountDistinct(args.map(nodeToExpr)) -case Token("TOK_FUNCTIONDI", Token(SUM(), Nil) :: arg :: Nil) => SumDistinct(nodeToExpr(arg)) +case Token("TOK_FUNCTIONDI", Token(COUNT(), Nil) :: args) => + Count(args.map(nodeToExpr)).toAggregateExpression(isDistinct = true) +case Token("TOK_FUNCTIONSTAR", Token(COUNT(), Nil) :: Nil) => + Count(Literal(1)).toAggregateExpression() /* Casts */ case Token("TOK_FUNCTION", Token("TOK_STRING", Nil) :: arg :: Nil) => http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index ea36c13..6bf2c53 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -69,11 +69,7 @@ class ScalaAggregateFunction(schema: StructType) extends UserDefinedAggregateFun abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { import testImplicits._ - var originalUseAggregate2: Boolean = _ - override def beforeAll(): Unit = { -originalUseAggregate2 = sqlContext.conf.useSqlAggregate2 -sqlContext.setConf(SQLConf.USE_SQL_AGGREGATE2.key, "true") val data1 = Seq[(Integer, Integer)]( (1, 10), (null, -60), @@ -120,7 +116,6 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te sqlContext.sql("DROP TABLE IF EXISTS agg1") sqlContext.sql("DROP TABLE IF EXISTS agg2") sqlContext.dropTempTable("emptyTable") -sqlContext.setConf(SQLConf.USE_SQL_AGGREGATE2.key, originalUseAggregate2.toString) } test("empty table") { @@ -447,73 +442,80 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te } test("single distinct column set") { -// DISTINCT is not meaningful with Max and Min, so we just ignore the DISTINCT keyword. -checkAnswer( - sqlContext.sql( -""" - |SELECT - | min(distinct value1), - | sum(distinct value1), - | avg(value1), - | avg(value2), - | max(distinct value1) - |FROM agg2 -""".stripMargin), - Row(-60, 70.0, 101.0/9.0, 5.6, 100)) - -checkAnswer( - sqlContext.sql( -""" - |SELECT - | mydoubleavg(distinct value1), - | avg(value1), - | avg(value2), - | key, - | mydoubleavg(value1 - 1), - | mydoubleavg(distinct value1) * 0.1, - | avg(value1 + value2) - |FROM agg2 - |GROUP BY key -""".stripMargin), - Row(120.0, 70.0/3.0, -10.0/3.0, 1, 67.0/3.0 + 100.0, 12.0, 20.0) :: -Row(100.0, 1.0/3.0, 1.0, 2, -2.0/3.0 + 100.0, 10.0, 2.0) :: -Row(null, null, 3.0, 3, null, null, null) :: -Row(110.0, 10.0, 20.0, null, 109.0, 11.0, 30.0) :: Nil) - -checkAnswer( - sqlContext.sql( -""" - |SELECT - | key, - | mydoubleavg(distinct value1), - | mydoublesum(value2), - | mydoublesum(distinct value1), - | mydoubleavg(distinct value1), - | mydoubleavg(value1) - |FROM agg2 - |GROUP BY key -""".stripMargin), -
[4/4] spark git commit: [SPARK-9830][SQL] Remove AggregateExpression1 and Aggregate Operator used to evaluate AggregateExpression1s
[SPARK-9830][SQL] Remove AggregateExpression1 and Aggregate Operator used to evaluate AggregateExpression1s https://issues.apache.org/jira/browse/SPARK-9830 This PR contains the following main changes. * Removing `AggregateExpression1`. * Removing `Aggregate` operator, which is used to evaluate `AggregateExpression1`. * Removing planner rule used to plan `Aggregate`. * Linking `MultipleDistinctRewriter` to analyzer. * Renaming `AggregateExpression2` to `AggregateExpression` and `AggregateFunction2` to `AggregateFunction`. * Updating places where we create aggregate expression. The way to create aggregate expressions is `AggregateExpression(aggregateFunction, mode, isDistinct)`. * Changing `val`s in `DeclarativeAggregate`s that touch children of this function to `lazy val`s (when we create aggregate expression in DataFrame API, children of an aggregate function can be unresolved). Author: Yin HuaiCloses #9556 from yhuai/removeAgg1. (cherry picked from commit e0701c75601c43f69ed27fc7c252321703db51f2) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c4ade0d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c4ade0d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c4ade0d Branch: refs/heads/branch-1.6 Commit: 7c4ade0d7665e0f473d00f4a812fa69a0e0d14b5 Parents: 825e971 Author: Yin Huai Authored: Tue Nov 10 11:06:29 2015 -0800 Committer: Michael Armbrust Committed: Tue Nov 10 11:06:48 2015 -0800 -- R/pkg/R/functions.R |2 +- python/pyspark/sql/dataframe.py |2 +- python/pyspark/sql/functions.py |2 +- python/pyspark/sql/tests.py |2 +- .../spark/sql/catalyst/CatalystConf.scala | 10 +- .../apache/spark/sql/catalyst/SqlParser.scala | 14 +- .../spark/sql/catalyst/analysis/Analyzer.scala | 26 +- .../sql/catalyst/analysis/CheckAnalysis.scala | 46 +- .../analysis/DistinctAggregationRewriter.scala | 278 + .../catalyst/analysis/FunctionRegistry.scala|2 + .../catalyst/analysis/HiveTypeCoercion.scala| 20 +- .../sql/catalyst/analysis/unresolved.scala |4 + .../apache/spark/sql/catalyst/dsl/package.scala | 22 +- .../expressions/aggregate/Average.scala | 31 +- .../aggregate/CentralMomentAgg.scala| 13 +- .../catalyst/expressions/aggregate/Corr.scala | 15 + .../catalyst/expressions/aggregate/Count.scala | 28 +- .../catalyst/expressions/aggregate/First.scala | 14 +- .../aggregate/HyperLogLogPlusPlus.scala | 17 + .../expressions/aggregate/Kurtosis.scala|2 + .../catalyst/expressions/aggregate/Last.scala | 12 +- .../catalyst/expressions/aggregate/Max.scala| 17 +- .../catalyst/expressions/aggregate/Min.scala| 17 +- .../expressions/aggregate/Skewness.scala|2 + .../catalyst/expressions/aggregate/Stddev.scala | 31 +- .../catalyst/expressions/aggregate/Sum.scala| 29 +- .../catalyst/expressions/aggregate/Utils.scala | 467 .../expressions/aggregate/Variance.scala|7 +- .../expressions/aggregate/interfaces.scala | 57 +- .../sql/catalyst/expressions/aggregates.scala | 1073 -- .../sql/catalyst/optimizer/Optimizer.scala | 23 +- .../spark/sql/catalyst/planning/patterns.scala | 74 -- .../spark/sql/catalyst/plans/QueryPlan.scala| 12 +- .../catalyst/plans/logical/basicOperators.scala |4 +- .../catalyst/analysis/AnalysisErrorSuite.scala | 23 +- .../sql/catalyst/analysis/AnalysisSuite.scala |2 +- .../analysis/DecimalPrecisionSuite.scala|1 + .../analysis/ExpressionTypeCheckingSuite.scala |6 +- .../optimizer/ConstantFoldingSuite.scala|4 +- .../optimizer/FilterPushdownSuite.scala | 14 +- .../scala/org/apache/spark/sql/DataFrame.scala | 13 +- .../org/apache/spark/sql/GroupedData.scala | 45 +- .../scala/org/apache/spark/sql/SQLConf.scala| 20 +- .../apache/spark/sql/execution/Aggregate.scala | 205 .../org/apache/spark/sql/execution/Expand.scala |3 + .../spark/sql/execution/SparkPlanner.scala |1 - .../spark/sql/execution/SparkStrategies.scala | 238 ++-- .../aggregate/AggregationIterator.scala | 28 +- .../aggregate/SortBasedAggregate.scala |4 +- .../SortBasedAggregationIterator.scala |8 +- .../execution/aggregate/TungstenAggregate.scala |6 +- .../aggregate/TungstenAggregationIterator.scala | 36 +- .../spark/sql/execution/aggregate/udaf.scala|2 +- .../spark/sql/execution/aggregate/utils.scala | 20 +- .../spark/sql/expressions/Aggregator.scala |5 +-
spark git commit: [ML][R] SparkR::glm summary result to compare with native R
Repository: spark Updated Branches: refs/heads/branch-1.6 f38509a76 -> 54685fa36 [ML][R] SparkR::glm summary result to compare with native R Follow up #9561. Due to [SPARK-11587](https://issues.apache.org/jira/browse/SPARK-11587) has been fixed, we should compare SparkR::glm summary result with native R output rather than hard-code one. mengxr Author: Yanbo LiangCloses #9590 from yanboliang/glm-r-test. (cherry picked from commit f14e95115c0939a77ebcb00209696a87fd651ff9) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/54685fa3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/54685fa3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/54685fa3 Branch: refs/heads/branch-1.6 Commit: 54685fa3637e7babf509adfeea0d6af652eeeb7e Parents: f38509a Author: Yanbo Liang Authored: Tue Nov 10 11:34:36 2015 -0800 Committer: Xiangrui Meng Committed: Tue Nov 10 11:34:44 2015 -0800 -- R/pkg/R/mllib.R | 2 +- R/pkg/inst/tests/test_mllib.R | 31 ++- 2 files changed, 11 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/54685fa3/R/pkg/R/mllib.R -- diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 7126b7c..f23e1c7 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -106,7 +106,7 @@ setMethod("summary", signature(object = "PipelineModel"), coefficients <- matrix(coefficients, ncol = 4) colnames(coefficients) <- c("Estimate", "Std. Error", "t value", "Pr(>|t|)") rownames(coefficients) <- unlist(features) - return(list(DevianceResiduals = devianceResiduals, Coefficients = coefficients)) + return(list(devianceResiduals = devianceResiduals, coefficients = coefficients)) } else { coefficients <- as.matrix(unlist(coefficients)) colnames(coefficients) <- c("Estimate") http://git-wip-us.apache.org/repos/asf/spark/blob/54685fa3/R/pkg/inst/tests/test_mllib.R -- diff --git a/R/pkg/inst/tests/test_mllib.R b/R/pkg/inst/tests/test_mllib.R index 42287ea..d497ad8 100644 --- a/R/pkg/inst/tests/test_mllib.R +++ b/R/pkg/inst/tests/test_mllib.R @@ -72,22 +72,17 @@ test_that("feature interaction vs native glm", { test_that("summary coefficients match with native glm", { training <- createDataFrame(sqlContext, iris) stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training, solver = "normal")) - coefs <- unlist(stats$Coefficients) - devianceResiduals <- unlist(stats$DevianceResiduals) + coefs <- unlist(stats$coefficients) + devianceResiduals <- unlist(stats$devianceResiduals) - rCoefs <- as.vector(coef(glm(Sepal.Width ~ Sepal.Length + Species, data = iris))) - rStdError <- c(0.23536, 0.04630, 0.07207, 0.09331) - rTValue <- c(7.123, 7.557, -13.644, -10.798) - rPValue <- c(0.0, 0.0, 0.0, 0.0) + rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris)) + rCoefs <- unlist(rStats$coefficients) rDevianceResiduals <- c(-0.95096, 0.72918) - expect_true(all(abs(rCoefs - coefs[1:4]) < 1e-6)) - expect_true(all(abs(rStdError - coefs[5:8]) < 1e-5)) - expect_true(all(abs(rTValue - coefs[9:12]) < 1e-3)) - expect_true(all(abs(rPValue - coefs[13:16]) < 1e-6)) + expect_true(all(abs(rCoefs - coefs) < 1e-5)) expect_true(all(abs(rDevianceResiduals - devianceResiduals) < 1e-5)) expect_true(all( -rownames(stats$Coefficients) == +rownames(stats$coefficients) == c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica"))) }) @@ -96,21 +91,15 @@ test_that("summary coefficients match with native glm of family 'binomial'", { training <- filter(df, df$Species != "setosa") stats <- summary(glm(Species ~ Sepal_Length + Sepal_Width, data = training, family = "binomial")) - coefs <- as.vector(stats$Coefficients) + coefs <- as.vector(stats$coefficients[,1]) rTraining <- iris[iris$Species %in% c("versicolor","virginica"),] rCoefs <- as.vector(coef(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining, family = binomial(link = "logit" - rStdError <- c(3.0974, 0.5169, 0.8628) - rTValue <- c(-4.212, 3.680, 0.469) - rPValue <- c(0.000, 0.000, 0.639) - - expect_true(all(abs(rCoefs - coefs[1:3]) < 1e-4)) - expect_true(all(abs(rStdError - coefs[4:6]) < 1e-4)) - expect_true(all(abs(rTValue - coefs[7:9]) < 1e-3)) - expect_true(all(abs(rPValue - coefs[10:12]) < 1e-3)) + + expect_true(all(abs(rCoefs - coefs) < 1e-4)) expect_true(all( -rownames(stats$Coefficients) == +
spark git commit: [SPARK-11618][ML] Minor refactoring of basic ML import/export
Repository: spark Updated Branches: refs/heads/branch-1.6 54685fa36 -> 80641c4fa [SPARK-11618][ML] Minor refactoring of basic ML import/export Refactoring * separated overwrite and param save logic in DefaultParamsWriter * added sparkVersion to DefaultParamsWriter CC: mengxr Author: Joseph K. BradleyCloses #9587 from jkbradley/logreg-io. (cherry picked from commit 18350a57004eb87cafa9504ff73affab4b818e06) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80641c4f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80641c4f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80641c4f Branch: refs/heads/branch-1.6 Commit: 80641c4faf9b208728f22c7ecac5b0c298ee0c6d Parents: 54685fa Author: Joseph K. Bradley Authored: Tue Nov 10 11:36:43 2015 -0800 Committer: Xiangrui Meng Committed: Tue Nov 10 11:36:50 2015 -0800 -- .../org/apache/spark/ml/util/ReadWrite.scala| 57 ++-- 1 file changed, 30 insertions(+), 27 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/80641c4f/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala index ea790e0..cbdf913 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala @@ -51,6 +51,9 @@ private[util] sealed trait BaseReadWrite { protected final def sqlContext: SQLContext = optionSQLContext.getOrElse { SQLContext.getOrCreate(SparkContext.getOrCreate()) } + + /** Returns the [[SparkContext]] underlying [[sqlContext]] */ + protected final def sc: SparkContext = sqlContext.sparkContext } /** @@ -58,7 +61,7 @@ private[util] sealed trait BaseReadWrite { */ @Experimental @Since("1.6.0") -abstract class Writer extends BaseReadWrite { +abstract class Writer extends BaseReadWrite with Logging { protected var shouldOverwrite: Boolean = false @@ -67,7 +70,29 @@ abstract class Writer extends BaseReadWrite { */ @Since("1.6.0") @throws[IOException]("If the input path already exists but overwrite is not enabled.") - def save(path: String): Unit + def save(path: String): Unit = { +val hadoopConf = sc.hadoopConfiguration +val fs = FileSystem.get(hadoopConf) +val p = new Path(path) +if (fs.exists(p)) { + if (shouldOverwrite) { +logInfo(s"Path $path already exists. It will be overwritten.") +// TODO: Revert back to the original content if save is not successful. +fs.delete(p, true) + } else { +throw new IOException( + s"Path $path already exists. Please use write.overwrite().save(path) to overwrite it.") + } +} +saveImpl(path) + } + + /** + * [[save()]] handles overwriting and then calls this method. Subclasses should override this + * method to implement the actual saving of the instance. + */ + @Since("1.6.0") + protected def saveImpl(path: String): Unit /** * Overwrites if the output path already exists. @@ -147,28 +172,9 @@ trait Readable[T] { * data (e.g., models with coefficients). * @param instance object to save */ -private[ml] class DefaultParamsWriter(instance: Params) extends Writer with Logging { - - /** - * Saves the ML component to the input path. - */ - override def save(path: String): Unit = { -val sc = sqlContext.sparkContext - -val hadoopConf = sc.hadoopConfiguration -val fs = FileSystem.get(hadoopConf) -val p = new Path(path) -if (fs.exists(p)) { - if (shouldOverwrite) { -logInfo(s"Path $path already exists. It will be overwritten.") -// TODO: Revert back to the original content if save is not successful. -fs.delete(p, true) - } else { -throw new IOException( - s"Path $path already exists. Please use write.overwrite().save(path) to overwrite it.") - } -} +private[ml] class DefaultParamsWriter(instance: Params) extends Writer { + override protected def saveImpl(path: String): Unit = { val uid = instance.uid val cls = instance.getClass.getName val params = instance.extractParamMap().toSeq.asInstanceOf[Seq[ParamPair[Any]]] @@ -177,6 +183,7 @@ private[ml] class DefaultParamsWriter(instance: Params) extends Writer with Logg }.toList val metadata = ("class" -> cls) ~ ("timestamp" -> System.currentTimeMillis()) ~ + ("sparkVersion" -> sc.version) ~ ("uid" -> uid) ~ ("paramMap" -> jsonParams) val metadataPath = new
spark git commit: [SPARK-11567] [PYTHON] Add Python API for corr Aggregate function
Repository: spark Updated Branches: refs/heads/branch-1.6 6a74efab0 -> 6616f4da3 [SPARK-11567] [PYTHON] Add Python API for corr Aggregate function like `df.agg(corr("col1", "col2")` davies Author: felixcheungCloses #9536 from felixcheung/pyfunc. (cherry picked from commit 32790fe7249b0efe2cbc5c4ee2df0fb687dcd624) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6616f4da Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6616f4da Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6616f4da Branch: refs/heads/branch-1.6 Commit: 6616f4da3719667939ed41f81d2e2ba32bb87c72 Parents: 6a74efa Author: felixcheung Authored: Tue Nov 10 15:47:10 2015 -0800 Committer: Davies Liu Committed: Tue Nov 10 15:47:21 2015 -0800 -- python/pyspark/sql/functions.py | 16 1 file changed, 16 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6616f4da/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 6e1cbde..c3da513 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -255,6 +255,22 @@ def coalesce(*cols): return Column(jc) +@since(1.6) +def corr(col1, col2): +"""Returns a new :class:`Column` for the Pearson Correlation Coefficient for ``col1`` +and ``col2``. + +>>> a = [x * x - 2 * x + 3.5 for x in range(20)] +>>> b = range(20) +>>> corrDf = sqlContext.createDataFrame(zip(a, b)) +>>> corrDf = corrDf.agg(corr(corrDf._1, corrDf._2).alias('c')) +>>> corrDf.selectExpr('abs(c - 0.9572339139475857) < 1e-16 as t').collect() +[Row(t=True)] +""" +sc = SparkContext._active_spark_context +return Column(sc._jvm.functions.corr(_to_java_column(col1), _to_java_column(col2))) + + @since(1.3) def countDistinct(col, *cols): """Returns a new :class:`Column` for distinct count of ``col`` or ``cols``. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11567] [PYTHON] Add Python API for corr Aggregate function
Repository: spark Updated Branches: refs/heads/master 638c51d93 -> 32790fe72 [SPARK-11567] [PYTHON] Add Python API for corr Aggregate function like `df.agg(corr("col1", "col2")` davies Author: felixcheungCloses #9536 from felixcheung/pyfunc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32790fe7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32790fe7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32790fe7 Branch: refs/heads/master Commit: 32790fe7249b0efe2cbc5c4ee2df0fb687dcd624 Parents: 638c51d Author: felixcheung Authored: Tue Nov 10 15:47:10 2015 -0800 Committer: Davies Liu Committed: Tue Nov 10 15:47:10 2015 -0800 -- python/pyspark/sql/functions.py | 16 1 file changed, 16 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/32790fe7/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 6e1cbde..c3da513 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -255,6 +255,22 @@ def coalesce(*cols): return Column(jc) +@since(1.6) +def corr(col1, col2): +"""Returns a new :class:`Column` for the Pearson Correlation Coefficient for ``col1`` +and ``col2``. + +>>> a = [x * x - 2 * x + 3.5 for x in range(20)] +>>> b = range(20) +>>> corrDf = sqlContext.createDataFrame(zip(a, b)) +>>> corrDf = corrDf.agg(corr(corrDf._1, corrDf._2).alias('c')) +>>> corrDf.selectExpr('abs(c - 0.9572339139475857) < 1e-16 as t').collect() +[Row(t=True)] +""" +sc = SparkContext._active_spark_context +return Column(sc._jvm.functions.corr(_to_java_column(col1), _to_java_column(col2))) + + @since(1.3) def countDistinct(col, *cols): """Returns a new :class:`Column` for distinct count of ``col`` or ``cols``. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9830][SPARK-11641][SQL][FOLLOW-UP] Remove AggregateExpression1 and update toString of Exchange
Repository: spark Updated Branches: refs/heads/branch-1.6 b79c1bd1e -> ff7d869c4 [SPARK-9830][SPARK-11641][SQL][FOLLOW-UP] Remove AggregateExpression1 and update toString of Exchange https://issues.apache.org/jira/browse/SPARK-9830 This is the follow-up pr for https://github.com/apache/spark/pull/9556 to address davies' comments. Author: Yin HuaiCloses #9607 from yhuai/removeAgg1-followup. (cherry picked from commit 3121e78168808c015fb21da8b0d44bb33649fb81) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff7d869c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff7d869c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff7d869c Branch: refs/heads/branch-1.6 Commit: ff7d869c47d8fbb88d0a0e3f9c431e6ff1e45390 Parents: b79c1bd Author: Yin Huai Authored: Tue Nov 10 16:25:22 2015 -0800 Committer: Reynold Xin Committed: Tue Nov 10 16:25:29 2015 -0800 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../sql/catalyst/analysis/CheckAnalysis.scala | 58 ++--- .../expressions/aggregate/Average.scala | 2 +- .../aggregate/CentralMomentAgg.scala| 2 +- .../catalyst/expressions/aggregate/Stddev.scala | 2 +- .../catalyst/expressions/aggregate/Sum.scala| 2 +- .../catalyst/analysis/AnalysisErrorSuite.scala | 127 +++ .../scala/org/apache/spark/sql/SQLConf.scala| 1 + .../apache/spark/sql/execution/Exchange.scala | 8 +- .../apache/spark/sql/execution/commands.scala | 10 ++ 10 files changed, 160 insertions(+), 54 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ff7d869c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index b1e1439..a9cd9a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -532,7 +532,7 @@ class Analyzer( case min: Min if isDistinct => AggregateExpression(min, Complete, isDistinct = false) // We get an aggregate function, we need to wrap it in an AggregateExpression. -case agg2: AggregateFunction => AggregateExpression(agg2, Complete, isDistinct) +case agg: AggregateFunction => AggregateExpression(agg, Complete, isDistinct) // This function is not an aggregate function, just return the resolved one. case other => other } http://git-wip-us.apache.org/repos/asf/spark/blob/ff7d869c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 8322e99..5a4b0c1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -110,17 +110,21 @@ trait CheckAnalysis { case Aggregate(groupingExprs, aggregateExprs, child) => def checkValidAggregateExpression(expr: Expression): Unit = expr match { case aggExpr: AggregateExpression => -// TODO: Is it possible that the child of a agg function is another -// agg function? -aggExpr.aggregateFunction.children.foreach { - // This is just a sanity check, our analysis rule PullOutNondeterministic should - // already pull out those nondeterministic expressions and evaluate them in - // a Project node. - case child if !child.deterministic => +aggExpr.aggregateFunction.children.foreach { child => + child.foreach { +case agg: AggregateExpression => + failAnalysis( +s"It is not allowed to use an aggregate function in the argument of " + + s"another aggregate function. Please use the inner aggregate function " + + s"in a sub-query.") +case other => // OK + } + + if (!child.deterministic) {
spark git commit: [SPARK-7316][MLLIB] RDD sliding window with step
Repository: spark Updated Branches: refs/heads/branch-1.6 80641c4fa -> f0180106a [SPARK-7316][MLLIB] RDD sliding window with step Implementation of step capability for sliding window function in MLlib's RDD. Though one can use current sliding window with step 1 and then filter every Nth window, it will take more time and space (N*data.count times more than needed). For example, below are the results for various windows and steps on 10M data points: Window | Step | Time | Windows produced | - | -- | -- 128 | 1 | 6.38 | 873 128 | 10 | 0.9 | 88 128 | 100 | 0.41 | 9 1024 | 1 | 44.67 | 9998977 1024 | 10 | 4.74 | 999898 1024 | 100 | 0.78 | 0 ``` import org.apache.spark.mllib.rdd.RDDFunctions._ val rdd = sc.parallelize(1 to 1000, 10) rdd.count val window = 1024 val step = 1 val t = System.nanoTime(); val windows = rdd.sliding(window, step); println(windows.count); println((System.nanoTime() - t) / 1e9) ``` Author: unknownAuthor: Alexander Ulanov Author: Xiangrui Meng Closes #5855 from avulanov/SPARK-7316-sliding. (cherry picked from commit dba1a62cf1baa9ae1ee665d592e01dfad78331a2) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0180106 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0180106 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0180106 Branch: refs/heads/branch-1.6 Commit: f0180106a044a7e700d7c0d3818805968819946d Parents: 80641c4 Author: unknown Authored: Tue Nov 10 14:25:06 2015 -0800 Committer: Xiangrui Meng Committed: Tue Nov 10 14:25:19 2015 -0800 -- .../apache/spark/mllib/rdd/RDDFunctions.scala | 11 ++- .../org/apache/spark/mllib/rdd/SlidingRDD.scala | 71 +++- .../spark/mllib/rdd/RDDFunctionsSuite.scala | 11 +-- 3 files changed, 54 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f0180106/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala index 7817284..19a047d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala @@ -37,16 +37,21 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) extends Serializable { * trigger a Spark job if the parent RDD has more than one partitions and the window size is * greater than 1. */ - def sliding(windowSize: Int): RDD[Array[T]] = { + def sliding(windowSize: Int, step: Int): RDD[Array[T]] = { require(windowSize > 0, s"Sliding window size must be positive, but got $windowSize.") -if (windowSize == 1) { +if (windowSize == 1 && step == 1) { self.map(Array(_)) } else { - new SlidingRDD[T](self, windowSize) + new SlidingRDD[T](self, windowSize, step) } } /** + * [[sliding(Int, Int)*]] with step = 1. + */ + def sliding(windowSize: Int): RDD[Array[T]] = sliding(windowSize, 1) + + /** * Reduces the elements of this RDD in a multi-level tree pattern. * * @param depth suggested depth of the tree (default: 2) http://git-wip-us.apache.org/repos/asf/spark/blob/f0180106/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala index 1facf83..ead8db6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala @@ -24,13 +24,13 @@ import org.apache.spark.{TaskContext, Partition} import org.apache.spark.rdd.RDD private[mllib] -class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T]) +class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T], val offset: Int) extends Partition with Serializable { override val index: Int = idx } /** - * Represents a RDD from grouping items of its parent RDD in fixed size blocks by passing a sliding + * Represents an RDD from grouping items of its parent RDD in fixed size blocks by passing a sliding * window over them. The ordering is first based on the partition index and then the ordering of * items within each partition. This is similar to sliding in Scala collections, except that it * becomes an empty RDD if the
spark git commit: [SPARK-7316][MLLIB] RDD sliding window with step
Repository: spark Updated Branches: refs/heads/master 18350a570 -> dba1a62cf [SPARK-7316][MLLIB] RDD sliding window with step Implementation of step capability for sliding window function in MLlib's RDD. Though one can use current sliding window with step 1 and then filter every Nth window, it will take more time and space (N*data.count times more than needed). For example, below are the results for various windows and steps on 10M data points: Window | Step | Time | Windows produced | - | -- | -- 128 | 1 | 6.38 | 873 128 | 10 | 0.9 | 88 128 | 100 | 0.41 | 9 1024 | 1 | 44.67 | 9998977 1024 | 10 | 4.74 | 999898 1024 | 100 | 0.78 | 0 ``` import org.apache.spark.mllib.rdd.RDDFunctions._ val rdd = sc.parallelize(1 to 1000, 10) rdd.count val window = 1024 val step = 1 val t = System.nanoTime(); val windows = rdd.sliding(window, step); println(windows.count); println((System.nanoTime() - t) / 1e9) ``` Author: unknownAuthor: Alexander Ulanov Author: Xiangrui Meng Closes #5855 from avulanov/SPARK-7316-sliding. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dba1a62c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dba1a62c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dba1a62c Branch: refs/heads/master Commit: dba1a62cf1baa9ae1ee665d592e01dfad78331a2 Parents: 18350a5 Author: unknown Authored: Tue Nov 10 14:25:06 2015 -0800 Committer: Xiangrui Meng Committed: Tue Nov 10 14:25:06 2015 -0800 -- .../apache/spark/mllib/rdd/RDDFunctions.scala | 11 ++- .../org/apache/spark/mllib/rdd/SlidingRDD.scala | 71 +++- .../spark/mllib/rdd/RDDFunctionsSuite.scala | 11 +-- 3 files changed, 54 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dba1a62c/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala index 7817284..19a047d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala @@ -37,16 +37,21 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) extends Serializable { * trigger a Spark job if the parent RDD has more than one partitions and the window size is * greater than 1. */ - def sliding(windowSize: Int): RDD[Array[T]] = { + def sliding(windowSize: Int, step: Int): RDD[Array[T]] = { require(windowSize > 0, s"Sliding window size must be positive, but got $windowSize.") -if (windowSize == 1) { +if (windowSize == 1 && step == 1) { self.map(Array(_)) } else { - new SlidingRDD[T](self, windowSize) + new SlidingRDD[T](self, windowSize, step) } } /** + * [[sliding(Int, Int)*]] with step = 1. + */ + def sliding(windowSize: Int): RDD[Array[T]] = sliding(windowSize, 1) + + /** * Reduces the elements of this RDD in a multi-level tree pattern. * * @param depth suggested depth of the tree (default: 2) http://git-wip-us.apache.org/repos/asf/spark/blob/dba1a62c/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala index 1facf83..ead8db6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala @@ -24,13 +24,13 @@ import org.apache.spark.{TaskContext, Partition} import org.apache.spark.rdd.RDD private[mllib] -class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T]) +class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T], val offset: Int) extends Partition with Serializable { override val index: Int = idx } /** - * Represents a RDD from grouping items of its parent RDD in fixed size blocks by passing a sliding + * Represents an RDD from grouping items of its parent RDD in fixed size blocks by passing a sliding * window over them. The ordering is first based on the partition index and then the ordering of * items within each partition. This is similar to sliding in Scala collections, except that it * becomes an empty RDD if the window size is greater than the total number of items. It needs to @@ -40,19 +40,24 @@ class SlidingRDDPartition[T](val idx:
spark git commit: [SPARK-11616][SQL] Improve toString for Dataset
Repository: spark Updated Branches: refs/heads/master dba1a62cf -> 724cf7a38 [SPARK-11616][SQL] Improve toString for Dataset Author: Michael ArmbrustCloses #9586 from marmbrus/dataset-toString. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/724cf7a3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/724cf7a3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/724cf7a3 Branch: refs/heads/master Commit: 724cf7a38c551bf2a79b87a8158bbe1725f9f888 Parents: dba1a62 Author: Michael Armbrust Authored: Tue Nov 10 14:30:19 2015 -0800 Committer: Michael Armbrust Committed: Tue Nov 10 14:30:19 2015 -0800 -- .../scala/org/apache/spark/sql/DataFrame.scala | 14 ++-- .../scala/org/apache/spark/sql/Dataset.scala| 4 +-- .../apache/spark/sql/execution/Queryable.scala | 37 .../org/apache/spark/sql/DatasetSuite.scala | 5 +++ 4 files changed, 47 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/724cf7a3/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 9368435..691b476 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.encoders.Encoder import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser} -import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, FileRelation, LogicalRDD, QueryExecution, SQLExecution} +import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, FileRelation, LogicalRDD, QueryExecution, Queryable, SQLExecution} import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.sources.HadoopFsRelation @@ -116,7 +116,8 @@ private[sql] object DataFrame { @Experimental class DataFrame private[sql]( @transient val sqlContext: SQLContext, -@DeveloperApi @transient val queryExecution: QueryExecution) extends Serializable { +@DeveloperApi @transient val queryExecution: QueryExecution) + extends Queryable with Serializable { // Note for Spark contributors: if adding or updating any action in `DataFrame`, please make sure // you wrap it with `withNewExecutionId` if this actions doesn't call other action. @@ -234,15 +235,6 @@ class DataFrame private[sql]( sb.toString() } - override def toString: String = { -try { - schema.map(f => s"${f.name}: ${f.dataType.simpleString}").mkString("[", ", ", "]") -} catch { - case NonFatal(e) => -s"Invalid tree; ${e.getMessage}:\n$queryExecution" -} - } - /** * Returns the object itself. * @group basic http://git-wip-us.apache.org/repos/asf/spark/blob/724cf7a3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 6d2968e..a7e5ab1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.{Queryable, QueryExecution} import org.apache.spark.sql.types.StructType /** @@ -62,7 +62,7 @@ import org.apache.spark.sql.types.StructType class Dataset[T] private[sql]( @transient val sqlContext: SQLContext, @transient val queryExecution: QueryExecution, -unresolvedEncoder: Encoder[T]) extends Serializable { +unresolvedEncoder: Encoder[T]) extends Queryable with Serializable { /** The encoder for this [[Dataset]] that has been resolved to its output schema. */ private[sql] implicit val encoder: ExpressionEncoder[T] = unresolvedEncoder match { http://git-wip-us.apache.org/repos/asf/spark/blob/724cf7a3/sql/core/src/main/scala/org/apache/spark/sql/execution/Queryable.scala -- diff --git
spark git commit: [SPARK-9818] Re-enable Docker tests for JDBC data source
Repository: spark Updated Branches: refs/heads/branch-1.6 6616f4da3 -> d0828d224 [SPARK-9818] Re-enable Docker tests for JDBC data source This patch re-enables tests for the Docker JDBC data source. These tests were reverted in #4872 due to transitive dependency conflicts introduced by the `docker-client` library. This patch should avoid those problems by using a version of `docker-client` which shades its transitive dependencies and by performing some build-magic to work around problems with that shaded JAR. In addition, I significantly refactored the tests to simplify the setup and teardown code and to fix several Docker networking issues which caused problems when running in `boot2docker`. Closes #8101. Author: Josh RosenAuthor: Yijie Shen Closes #9503 from JoshRosen/docker-jdbc-tests. (cherry picked from commit 1dde39d796bbf42336051a86bedf871c7fddd513) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0828d22 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0828d22 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0828d22 Branch: refs/heads/branch-1.6 Commit: d0828d224868b7320e414d8071218431b98bb0b2 Parents: 6616f4d Author: Josh Rosen Authored: Tue Nov 10 15:58:30 2015 -0800 Committer: Reynold Xin Committed: Tue Nov 10 15:58:41 2015 -0800 -- docker-integration-tests/pom.xml| 149 + .../sql/jdbc/DockerJDBCIntegrationSuite.scala | 160 +++ .../spark/sql/jdbc/MySQLIntegrationSuite.scala | 153 ++ .../sql/jdbc/PostgresIntegrationSuite.scala | 82 ++ .../org/apache/spark/util/DockerUtils.scala | 68 pom.xml | 14 ++ project/SparkBuild.scala| 14 +- .../java/org/apache/spark/tags/DockerTest.java | 26 +++ 8 files changed, 664 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d0828d22/docker-integration-tests/pom.xml -- diff --git a/docker-integration-tests/pom.xml b/docker-integration-tests/pom.xml new file mode 100644 index 000..dee0c4a --- /dev/null +++ b/docker-integration-tests/pom.xml @@ -0,0 +1,149 @@ + + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + 4.0.0 + +org.apache.spark +spark-parent_2.10 +1.6.0-SNAPSHOT +../pom.xml + + + spark-docker-integration-tests_2.10 + jar + Spark Project Docker Integration Tests + http://spark.apache.org/ + +docker-integration-tests + + + + + com.spotify + docker-client + shaded + test + + + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-json-provider + + + com.fasterxml.jackson.datatype + jackson-datatype-guava + + + com.fasterxml.jackson.core + jackson-databind + + + org.glassfish.jersey.core + jersey-client + + + org.glassfish.jersey.connectors + jersey-apache-connector + + + org.glassfish.jersey.media + jersey-media-json-jackson + + + + + + com.google.guava + guava + 18.0 + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + test + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + test-jar + test + + + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + test + + + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + test-jar + test + + + org.apache.spark + spark-test-tags_${scala.binary.version} + ${project.version} + test + + + + com.sun.jersey + jersey-server + 1.19 + test + + + com.sun.jersey + jersey-core + 1.19 + test + + + com.sun.jersey + jersey-servlet + 1.19 + test + + + com.sun.jersey + jersey-json + 1.19 + test + + + stax + stax-api + + + + + + http://git-wip-us.apache.org/repos/asf/spark/blob/d0828d22/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
spark git commit: [SPARK-9818] Re-enable Docker tests for JDBC data source
Repository: spark Updated Branches: refs/heads/master 32790fe72 -> 1dde39d79 [SPARK-9818] Re-enable Docker tests for JDBC data source This patch re-enables tests for the Docker JDBC data source. These tests were reverted in #4872 due to transitive dependency conflicts introduced by the `docker-client` library. This patch should avoid those problems by using a version of `docker-client` which shades its transitive dependencies and by performing some build-magic to work around problems with that shaded JAR. In addition, I significantly refactored the tests to simplify the setup and teardown code and to fix several Docker networking issues which caused problems when running in `boot2docker`. Closes #8101. Author: Josh RosenAuthor: Yijie Shen Closes #9503 from JoshRosen/docker-jdbc-tests. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1dde39d7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1dde39d7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1dde39d7 Branch: refs/heads/master Commit: 1dde39d796bbf42336051a86bedf871c7fddd513 Parents: 32790fe Author: Josh Rosen Authored: Tue Nov 10 15:58:30 2015 -0800 Committer: Reynold Xin Committed: Tue Nov 10 15:58:30 2015 -0800 -- docker-integration-tests/pom.xml| 149 + .../sql/jdbc/DockerJDBCIntegrationSuite.scala | 160 +++ .../spark/sql/jdbc/MySQLIntegrationSuite.scala | 153 ++ .../sql/jdbc/PostgresIntegrationSuite.scala | 82 ++ .../org/apache/spark/util/DockerUtils.scala | 68 pom.xml | 14 ++ project/SparkBuild.scala| 14 +- .../java/org/apache/spark/tags/DockerTest.java | 26 +++ 8 files changed, 664 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1dde39d7/docker-integration-tests/pom.xml -- diff --git a/docker-integration-tests/pom.xml b/docker-integration-tests/pom.xml new file mode 100644 index 000..dee0c4a --- /dev/null +++ b/docker-integration-tests/pom.xml @@ -0,0 +1,149 @@ + + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + 4.0.0 + +org.apache.spark +spark-parent_2.10 +1.6.0-SNAPSHOT +../pom.xml + + + spark-docker-integration-tests_2.10 + jar + Spark Project Docker Integration Tests + http://spark.apache.org/ + +docker-integration-tests + + + + + com.spotify + docker-client + shaded + test + + + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-json-provider + + + com.fasterxml.jackson.datatype + jackson-datatype-guava + + + com.fasterxml.jackson.core + jackson-databind + + + org.glassfish.jersey.core + jersey-client + + + org.glassfish.jersey.connectors + jersey-apache-connector + + + org.glassfish.jersey.media + jersey-media-json-jackson + + + + + + com.google.guava + guava + 18.0 + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + test + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + test-jar + test + + + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + test + + + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + test-jar + test + + + org.apache.spark + spark-test-tags_${scala.binary.version} + ${project.version} + test + + + + com.sun.jersey + jersey-server + 1.19 + test + + + com.sun.jersey + jersey-core + 1.19 + test + + + com.sun.jersey + jersey-servlet + 1.19 + test + + + com.sun.jersey + jersey-json + 1.19 + test + + + stax + stax-api + + + + + + http://git-wip-us.apache.org/repos/asf/spark/blob/1dde39d7/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala -- diff --git
spark git commit: [SPARK-5565][ML] LDA wrapper for Pipelines API
Repository: spark Updated Branches: refs/heads/master 1dde39d79 -> e281b8739 [SPARK-5565][ML] LDA wrapper for Pipelines API This adds LDA to spark.ml, the Pipelines API. It follows the design doc in the JIRA: [https://issues.apache.org/jira/browse/SPARK-5565], with one major change: * I eliminated doc IDs. These are not necessary with DataFrames since the user can add an ID column as needed. Note: This will conflict with [https://github.com/apache/spark/pull/9484], but I'll try to merge [https://github.com/apache/spark/pull/9484] first and then rebase this PR. CC: hhbyyh feynmanliang If you have a chance to make a pass, that'd be really helpful--thanks! Now that I'm done traveling & this PR is almost ready, I'll see about reviewing other PRs critical for 1.6. CC: mengxr Author: Joseph K. BradleyCloses #9513 from jkbradley/lda-pipelines. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e281b873 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e281b873 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e281b873 Branch: refs/heads/master Commit: e281b87398f1298cc3df8e0409c7040acdddce03 Parents: 1dde39d Author: Joseph K. Bradley Authored: Tue Nov 10 16:20:10 2015 -0800 Committer: Joseph K. Bradley Committed: Tue Nov 10 16:20:10 2015 -0800 -- .../org/apache/spark/ml/clustering/LDA.scala| 701 +++ .../spark/mllib/clustering/LDAModel.scala | 29 +- .../apache/spark/ml/clustering/LDASuite.scala | 221 ++ 3 files changed, 946 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e281b873/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala new file mode 100644 index 000..f66233e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -0,0 +1,701 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.Logging +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.util.{SchemaUtils, Identifiable} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasSeed, HasMaxIter} +import org.apache.spark.ml.param._ +import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, +EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, +LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, +OnlineLDAOptimizer => OldOnlineLDAOptimizer} +import org.apache.spark.mllib.linalg.{VectorUDT, Vectors, Matrix, Vector} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{SQLContext, DataFrame, Row} +import org.apache.spark.sql.functions.{col, monotonicallyIncreasingId, udf} +import org.apache.spark.sql.types.StructType + + +private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasMaxIter + with HasSeed with HasCheckpointInterval { + + /** + * Param for the number of topics (clusters) to infer. Must be > 1. Default: 10. + * @group param + */ + @Since("1.6.0") + final val k = new IntParam(this, "k", "number of topics (clusters) to infer", +ParamValidators.gt(1)) + + /** @group getParam */ + @Since("1.6.0") + def getK: Int = $(k) + + /** + * Concentration parameter (commonly named "alpha") for the prior placed on documents' + * distributions over topics ("theta"). + * + * This is the parameter to a Dirichlet distribution, where larger values mean more smoothing + * (more regularization). + * + * If not set by the user, then docConcentration is set automatically. If set to + * singleton vector [alpha], then alpha is replicated to a vector of length k in
spark git commit: [SPARK-5565][ML] LDA wrapper for Pipelines API
Repository: spark Updated Branches: refs/heads/branch-1.6 d0828d224 -> b79c1bd1e [SPARK-5565][ML] LDA wrapper for Pipelines API This adds LDA to spark.ml, the Pipelines API. It follows the design doc in the JIRA: [https://issues.apache.org/jira/browse/SPARK-5565], with one major change: * I eliminated doc IDs. These are not necessary with DataFrames since the user can add an ID column as needed. Note: This will conflict with [https://github.com/apache/spark/pull/9484], but I'll try to merge [https://github.com/apache/spark/pull/9484] first and then rebase this PR. CC: hhbyyh feynmanliang If you have a chance to make a pass, that'd be really helpful--thanks! Now that I'm done traveling & this PR is almost ready, I'll see about reviewing other PRs critical for 1.6. CC: mengxr Author: Joseph K. BradleyCloses #9513 from jkbradley/lda-pipelines. (cherry picked from commit e281b87398f1298cc3df8e0409c7040acdddce03) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b79c1bd1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b79c1bd1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b79c1bd1 Branch: refs/heads/branch-1.6 Commit: b79c1bd1eee79de1c63daeb0d93f2cdd577e9509 Parents: d0828d2 Author: Joseph K. Bradley Authored: Tue Nov 10 16:20:10 2015 -0800 Committer: Joseph K. Bradley Committed: Tue Nov 10 16:20:19 2015 -0800 -- .../org/apache/spark/ml/clustering/LDA.scala| 701 +++ .../spark/mllib/clustering/LDAModel.scala | 29 +- .../apache/spark/ml/clustering/LDASuite.scala | 221 ++ 3 files changed, 946 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b79c1bd1/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala new file mode 100644 index 000..f66233e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -0,0 +1,701 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.Logging +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.util.{SchemaUtils, Identifiable} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasSeed, HasMaxIter} +import org.apache.spark.ml.param._ +import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, +EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, +LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, +OnlineLDAOptimizer => OldOnlineLDAOptimizer} +import org.apache.spark.mllib.linalg.{VectorUDT, Vectors, Matrix, Vector} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{SQLContext, DataFrame, Row} +import org.apache.spark.sql.functions.{col, monotonicallyIncreasingId, udf} +import org.apache.spark.sql.types.StructType + + +private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasMaxIter + with HasSeed with HasCheckpointInterval { + + /** + * Param for the number of topics (clusters) to infer. Must be > 1. Default: 10. + * @group param + */ + @Since("1.6.0") + final val k = new IntParam(this, "k", "number of topics (clusters) to infer", +ParamValidators.gt(1)) + + /** @group getParam */ + @Since("1.6.0") + def getK: Int = $(k) + + /** + * Concentration parameter (commonly named "alpha") for the prior placed on documents' + * distributions over topics ("theta"). + * + * This is the parameter to a Dirichlet distribution, where larger values mean more smoothing + * (more regularization). + * + * If not set by the user, then
spark git commit: [SPARK-11616][SQL] Improve toString for Dataset
Repository: spark Updated Branches: refs/heads/branch-1.6 f0180106a -> 8fb7b8304 [SPARK-11616][SQL] Improve toString for Dataset Author: Michael ArmbrustCloses #9586 from marmbrus/dataset-toString. (cherry picked from commit 724cf7a38c551bf2a79b87a8158bbe1725f9f888) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8fb7b830 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8fb7b830 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8fb7b830 Branch: refs/heads/branch-1.6 Commit: 8fb7b8304cd828550a63dac7f1de9fb5ae004ecc Parents: f018010 Author: Michael Armbrust Authored: Tue Nov 10 14:30:19 2015 -0800 Committer: Michael Armbrust Committed: Tue Nov 10 14:30:41 2015 -0800 -- .../scala/org/apache/spark/sql/DataFrame.scala | 14 ++-- .../scala/org/apache/spark/sql/Dataset.scala| 4 +-- .../apache/spark/sql/execution/Queryable.scala | 37 .../org/apache/spark/sql/DatasetSuite.scala | 5 +++ 4 files changed, 47 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8fb7b830/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 9368435..691b476 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.encoders.Encoder import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser} -import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, FileRelation, LogicalRDD, QueryExecution, SQLExecution} +import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, FileRelation, LogicalRDD, QueryExecution, Queryable, SQLExecution} import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.sources.HadoopFsRelation @@ -116,7 +116,8 @@ private[sql] object DataFrame { @Experimental class DataFrame private[sql]( @transient val sqlContext: SQLContext, -@DeveloperApi @transient val queryExecution: QueryExecution) extends Serializable { +@DeveloperApi @transient val queryExecution: QueryExecution) + extends Queryable with Serializable { // Note for Spark contributors: if adding or updating any action in `DataFrame`, please make sure // you wrap it with `withNewExecutionId` if this actions doesn't call other action. @@ -234,15 +235,6 @@ class DataFrame private[sql]( sb.toString() } - override def toString: String = { -try { - schema.map(f => s"${f.name}: ${f.dataType.simpleString}").mkString("[", ", ", "]") -} catch { - case NonFatal(e) => -s"Invalid tree; ${e.getMessage}:\n$queryExecution" -} - } - /** * Returns the object itself. * @group basic http://git-wip-us.apache.org/repos/asf/spark/blob/8fb7b830/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 6d2968e..a7e5ab1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.{Queryable, QueryExecution} import org.apache.spark.sql.types.StructType /** @@ -62,7 +62,7 @@ import org.apache.spark.sql.types.StructType class Dataset[T] private[sql]( @transient val sqlContext: SQLContext, @transient val queryExecution: QueryExecution, -unresolvedEncoder: Encoder[T]) extends Serializable { +unresolvedEncoder: Encoder[T]) extends Queryable with Serializable { /** The encoder for this [[Dataset]] that has been resolved to its output schema. */ private[sql] implicit val encoder: ExpressionEncoder[T] = unresolvedEncoder match {
spark git commit: [SPARK-9830][SPARK-11641][SQL][FOLLOW-UP] Remove AggregateExpression1 and update toString of Exchange
Repository: spark Updated Branches: refs/heads/master e281b8739 -> 3121e7816 [SPARK-9830][SPARK-11641][SQL][FOLLOW-UP] Remove AggregateExpression1 and update toString of Exchange https://issues.apache.org/jira/browse/SPARK-9830 This is the follow-up pr for https://github.com/apache/spark/pull/9556 to address davies' comments. Author: Yin HuaiCloses #9607 from yhuai/removeAgg1-followup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3121e781 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3121e781 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3121e781 Branch: refs/heads/master Commit: 3121e78168808c015fb21da8b0d44bb33649fb81 Parents: e281b87 Author: Yin Huai Authored: Tue Nov 10 16:25:22 2015 -0800 Committer: Reynold Xin Committed: Tue Nov 10 16:25:22 2015 -0800 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../sql/catalyst/analysis/CheckAnalysis.scala | 58 ++--- .../expressions/aggregate/Average.scala | 2 +- .../aggregate/CentralMomentAgg.scala| 2 +- .../catalyst/expressions/aggregate/Stddev.scala | 2 +- .../catalyst/expressions/aggregate/Sum.scala| 2 +- .../catalyst/analysis/AnalysisErrorSuite.scala | 127 +++ .../scala/org/apache/spark/sql/SQLConf.scala| 1 + .../apache/spark/sql/execution/Exchange.scala | 8 +- .../apache/spark/sql/execution/commands.scala | 10 ++ 10 files changed, 160 insertions(+), 54 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3121e781/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index b1e1439..a9cd9a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -532,7 +532,7 @@ class Analyzer( case min: Min if isDistinct => AggregateExpression(min, Complete, isDistinct = false) // We get an aggregate function, we need to wrap it in an AggregateExpression. -case agg2: AggregateFunction => AggregateExpression(agg2, Complete, isDistinct) +case agg: AggregateFunction => AggregateExpression(agg, Complete, isDistinct) // This function is not an aggregate function, just return the resolved one. case other => other } http://git-wip-us.apache.org/repos/asf/spark/blob/3121e781/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 8322e99..5a4b0c1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -110,17 +110,21 @@ trait CheckAnalysis { case Aggregate(groupingExprs, aggregateExprs, child) => def checkValidAggregateExpression(expr: Expression): Unit = expr match { case aggExpr: AggregateExpression => -// TODO: Is it possible that the child of a agg function is another -// agg function? -aggExpr.aggregateFunction.children.foreach { - // This is just a sanity check, our analysis rule PullOutNondeterministic should - // already pull out those nondeterministic expressions and evaluate them in - // a Project node. - case child if !child.deterministic => +aggExpr.aggregateFunction.children.foreach { child => + child.foreach { +case agg: AggregateExpression => + failAnalysis( +s"It is not allowed to use an aggregate function in the argument of " + + s"another aggregate function. Please use the inner aggregate function " + + s"in a sub-query.") +case other => // OK + } + + if (!child.deterministic) { failAnalysis( s"nondeterministic expression ${expr.prettyString} should not " +
spark git commit: [SPARK-11550][DOCS] Replace example code in mllib-optimization.md using include_example
Repository: spark Updated Branches: refs/heads/master 724cf7a38 -> 638c51d93 [SPARK-11550][DOCS] Replace example code in mllib-optimization.md using include_example Author: Pravin GadakhCloses #9516 from pravingadakh/SPARK-11550. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/638c51d9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/638c51d9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/638c51d9 Branch: refs/heads/master Commit: 638c51d9380081b3b8182be2c2460bd53b8b0a4f Parents: 724cf7a Author: Pravin Gadakh Authored: Tue Nov 10 14:47:04 2015 -0800 Committer: Xiangrui Meng Committed: Tue Nov 10 14:47:04 2015 -0800 -- docs/mllib-optimization.md | 145 +-- .../spark/examples/mllib/JavaLBFGSExample.java | 108 ++ .../spark/examples/mllib/LBFGSExample.scala | 90 3 files changed, 200 insertions(+), 143 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/638c51d9/docs/mllib-optimization.md -- diff --git a/docs/mllib-optimization.md b/docs/mllib-optimization.md index a3bd130..ad7bcd9 100644 --- a/docs/mllib-optimization.md +++ b/docs/mllib-optimization.md @@ -220,154 +220,13 @@ L-BFGS optimizer. Refer to the [`LBFGS` Scala docs](api/scala/index.html#org.apache.spark.mllib.optimization.LBFGS) and [`SquaredL2Updater` Scala docs](api/scala/index.html#org.apache.spark.mllib.optimization.SquaredL2Updater) for details on the API. -{% highlight scala %} -import org.apache.spark.SparkContext -import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.util.MLUtils -import org.apache.spark.mllib.classification.LogisticRegressionModel -import org.apache.spark.mllib.optimization.{LBFGS, LogisticGradient, SquaredL2Updater} - -val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") -val numFeatures = data.take(1)(0).features.size - -// Split data into training (60%) and test (40%). -val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) - -// Append 1 into the training data as intercept. -val training = splits(0).map(x => (x.label, MLUtils.appendBias(x.features))).cache() - -val test = splits(1) - -// Run training algorithm to build the model -val numCorrections = 10 -val convergenceTol = 1e-4 -val maxNumIterations = 20 -val regParam = 0.1 -val initialWeightsWithIntercept = Vectors.dense(new Array[Double](numFeatures + 1)) - -val (weightsWithIntercept, loss) = LBFGS.runLBFGS( - training, - new LogisticGradient(), - new SquaredL2Updater(), - numCorrections, - convergenceTol, - maxNumIterations, - regParam, - initialWeightsWithIntercept) - -val model = new LogisticRegressionModel( - Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1)), - weightsWithIntercept(weightsWithIntercept.size - 1)) - -// Clear the default threshold. -model.clearThreshold() - -// Compute raw scores on the test set. -val scoreAndLabels = test.map { point => - val score = model.predict(point.features) - (score, point.label) -} - -// Get evaluation metrics. -val metrics = new BinaryClassificationMetrics(scoreAndLabels) -val auROC = metrics.areaUnderROC() - -println("Loss of each step in training process") -loss.foreach(println) -println("Area under ROC = " + auROC) -{% endhighlight %} +{% include_example scala/org/apache/spark/examples/mllib/LBFGSExample.scala %} Refer to the [`LBFGS` Java docs](api/java/org/apache/spark/mllib/optimization/LBFGS.html) and [`SquaredL2Updater` Java docs](api/java/org/apache/spark/mllib/optimization/SquaredL2Updater.html) for details on the API. -{% highlight java %} -import java.util.Arrays; -import java.util.Random; - -import scala.Tuple2; - -import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.mllib.classification.LogisticRegressionModel; -import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics; -import org.apache.spark.mllib.linalg.Vector; -import org.apache.spark.mllib.linalg.Vectors; -import org.apache.spark.mllib.optimization.*; -import org.apache.spark.mllib.regression.LabeledPoint; -import org.apache.spark.mllib.util.MLUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.SparkContext; - -public class LBFGSExample { - public static void main(String[] args) { -SparkConf conf = new SparkConf().setAppName("L-BFGS Example"); -SparkContext sc = new SparkContext(conf); -String path = "data/mllib/sample_libsvm_data.txt"; -JavaRDD data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD(); -
spark git commit: [SPARK-11550][DOCS] Replace example code in mllib-optimization.md using include_example
Repository: spark Updated Branches: refs/heads/branch-1.6 8fb7b8304 -> 6a74efab0 [SPARK-11550][DOCS] Replace example code in mllib-optimization.md using include_example Author: Pravin GadakhCloses #9516 from pravingadakh/SPARK-11550. (cherry picked from commit 638c51d9380081b3b8182be2c2460bd53b8b0a4f) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a74efab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a74efab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a74efab Branch: refs/heads/branch-1.6 Commit: 6a74efab0e4c2f41c19412aa097df64b58f36a2c Parents: 8fb7b83 Author: Pravin Gadakh Authored: Tue Nov 10 14:47:04 2015 -0800 Committer: Xiangrui Meng Committed: Tue Nov 10 14:47:10 2015 -0800 -- docs/mllib-optimization.md | 145 +-- .../spark/examples/mllib/JavaLBFGSExample.java | 108 ++ .../spark/examples/mllib/LBFGSExample.scala | 90 3 files changed, 200 insertions(+), 143 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6a74efab/docs/mllib-optimization.md -- diff --git a/docs/mllib-optimization.md b/docs/mllib-optimization.md index a3bd130..ad7bcd9 100644 --- a/docs/mllib-optimization.md +++ b/docs/mllib-optimization.md @@ -220,154 +220,13 @@ L-BFGS optimizer. Refer to the [`LBFGS` Scala docs](api/scala/index.html#org.apache.spark.mllib.optimization.LBFGS) and [`SquaredL2Updater` Scala docs](api/scala/index.html#org.apache.spark.mllib.optimization.SquaredL2Updater) for details on the API. -{% highlight scala %} -import org.apache.spark.SparkContext -import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.util.MLUtils -import org.apache.spark.mllib.classification.LogisticRegressionModel -import org.apache.spark.mllib.optimization.{LBFGS, LogisticGradient, SquaredL2Updater} - -val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") -val numFeatures = data.take(1)(0).features.size - -// Split data into training (60%) and test (40%). -val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) - -// Append 1 into the training data as intercept. -val training = splits(0).map(x => (x.label, MLUtils.appendBias(x.features))).cache() - -val test = splits(1) - -// Run training algorithm to build the model -val numCorrections = 10 -val convergenceTol = 1e-4 -val maxNumIterations = 20 -val regParam = 0.1 -val initialWeightsWithIntercept = Vectors.dense(new Array[Double](numFeatures + 1)) - -val (weightsWithIntercept, loss) = LBFGS.runLBFGS( - training, - new LogisticGradient(), - new SquaredL2Updater(), - numCorrections, - convergenceTol, - maxNumIterations, - regParam, - initialWeightsWithIntercept) - -val model = new LogisticRegressionModel( - Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1)), - weightsWithIntercept(weightsWithIntercept.size - 1)) - -// Clear the default threshold. -model.clearThreshold() - -// Compute raw scores on the test set. -val scoreAndLabels = test.map { point => - val score = model.predict(point.features) - (score, point.label) -} - -// Get evaluation metrics. -val metrics = new BinaryClassificationMetrics(scoreAndLabels) -val auROC = metrics.areaUnderROC() - -println("Loss of each step in training process") -loss.foreach(println) -println("Area under ROC = " + auROC) -{% endhighlight %} +{% include_example scala/org/apache/spark/examples/mllib/LBFGSExample.scala %} Refer to the [`LBFGS` Java docs](api/java/org/apache/spark/mllib/optimization/LBFGS.html) and [`SquaredL2Updater` Java docs](api/java/org/apache/spark/mllib/optimization/SquaredL2Updater.html) for details on the API. -{% highlight java %} -import java.util.Arrays; -import java.util.Random; - -import scala.Tuple2; - -import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.mllib.classification.LogisticRegressionModel; -import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics; -import org.apache.spark.mllib.linalg.Vector; -import org.apache.spark.mllib.linalg.Vectors; -import org.apache.spark.mllib.optimization.*; -import org.apache.spark.mllib.regression.LabeledPoint; -import org.apache.spark.mllib.util.MLUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.SparkContext; - -public class LBFGSExample { - public static void main(String[] args) { -SparkConf conf = new SparkConf().setAppName("L-BFGS Example"); -SparkContext sc = new
spark git commit: [SPARK-9241][SQL] Supporting multiple DISTINCT columns - follow-up (3)
Repository: spark Updated Branches: refs/heads/branch-1.6 ff7d869c4 -> ce5aba32f [SPARK-9241][SQL] Supporting multiple DISTINCT columns - follow-up (3) This PR is a 2nd follow-up for [SPARK-9241](https://issues.apache.org/jira/browse/SPARK-9241). It contains the following improvements: * Fix for a potential bug in distinct child expression and attribute alignment. * Improved handling of duplicate distinct child expressions. * Added test for distinct UDAF with multiple children. cc yhuai Author: Herman van HovellCloses #9566 from hvanhovell/SPARK-9241-followup-2. (cherry picked from commit 21c562fa03430365f5c2b7d6de1f8f60ab2140d4) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce5aba32 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce5aba32 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce5aba32 Branch: refs/heads/branch-1.6 Commit: ce5aba32f0376c99a81b170ff10acb209afaa795 Parents: ff7d869 Author: Herman van Hovell Authored: Tue Nov 10 16:28:21 2015 -0800 Committer: Yin Huai Committed: Tue Nov 10 16:28:37 2015 -0800 -- .../analysis/DistinctAggregationRewriter.scala | 9 +++-- .../hive/execution/AggregationQuerySuite.scala | 41 ++-- 2 files changed, 42 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ce5aba32/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala index 397eff0..c0c9604 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala @@ -151,11 +151,12 @@ case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalP } // Setup unique distinct aggregate children. - val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq - val distinctAggChildAttrMap = distinctAggChildren.map(expressionAttributePair).toMap - val distinctAggChildAttrs = distinctAggChildAttrMap.values.toSeq + val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq.distinct + val distinctAggChildAttrMap = distinctAggChildren.map(expressionAttributePair) + val distinctAggChildAttrs = distinctAggChildAttrMap.map(_._2) // Setup expand & aggregate operators for distinct aggregate expressions. + val distinctAggChildAttrLookup = distinctAggChildAttrMap.toMap val distinctAggOperatorMap = distinctAggGroups.toSeq.zipWithIndex.map { case ((group, expressions), i) => val id = Literal(i + 1) @@ -170,7 +171,7 @@ case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalP val operators = expressions.map { e => val af = e.aggregateFunction val naf = patchAggregateFunctionChildren(af) { x => - evalWithinGroup(id, distinctAggChildAttrMap(x)) + evalWithinGroup(id, distinctAggChildAttrLookup(x)) } (e, e.copy(aggregateFunction = naf, isDistinct = false)) } http://git-wip-us.apache.org/repos/asf/spark/blob/ce5aba32/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 6bf2c53..8253921 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -66,6 +66,36 @@ class ScalaAggregateFunction(schema: StructType) extends UserDefinedAggregateFun } } +class LongProductSum extends UserDefinedAggregateFunction { + def inputSchema: StructType = new StructType() +.add("a", LongType) +.add("b", LongType) + + def bufferSchema: StructType = new StructType() +.add("product", LongType) + + def dataType: DataType = LongType + + def deterministic: Boolean = true + + def initialize(buffer: MutableAggregationBuffer): Unit = { +buffer(0) = 0L + } + + def update(buffer: MutableAggregationBuffer, input: Row): Unit = { +if (!(input.isNullAt(0) ||
spark git commit: [SPARK-11566] [MLLIB] [PYTHON] Refactoring GaussianMixtureModel.gaussians in Python
Repository: spark Updated Branches: refs/heads/master a3989058c -> c0e48dfa6 [SPARK-11566] [MLLIB] [PYTHON] Refactoring GaussianMixtureModel.gaussians in Python cc jkbradley Author: Yu ISHIKAWACloses #9534 from yu-iskw/SPARK-11566. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0e48dfa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0e48dfa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0e48dfa Branch: refs/heads/master Commit: c0e48dfa611fa5d94132af7e6f6731f60ab833da Parents: a398905 Author: Yu ISHIKAWA Authored: Tue Nov 10 16:42:28 2015 -0800 Committer: Davies Liu Committed: Tue Nov 10 16:42:28 2015 -0800 -- .../python/GaussianMixtureModelWrapper.scala| 21 ++-- python/pyspark/mllib/clustering.py | 2 +- 2 files changed, 7 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c0e48dfa/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala index 0ec88ef..6a3b20c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala @@ -17,14 +17,11 @@ package org.apache.spark.mllib.api.python -import java.util.{List => JList} - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters import org.apache.spark.SparkContext -import org.apache.spark.mllib.linalg.{Vector, Vectors, Matrix} import org.apache.spark.mllib.clustering.GaussianMixtureModel +import org.apache.spark.mllib.linalg.{Vector, Vectors} /** * Wrapper around GaussianMixtureModel to provide helper methods in Python @@ -36,17 +33,11 @@ private[python] class GaussianMixtureModelWrapper(model: GaussianMixtureModel) { /** * Returns gaussians as a List of Vectors and Matrices corresponding each MultivariateGaussian */ - val gaussians: JList[Object] = { -val modelGaussians = model.gaussians -var i = 0 -var mu = ArrayBuffer.empty[Vector] -var sigma = ArrayBuffer.empty[Matrix] -while (i < k) { - mu += modelGaussians(i).mu - sigma += modelGaussians(i).sigma - i += 1 + val gaussians: Array[Byte] = { +val modelGaussians = model.gaussians.map { gaussian => + Array[Any](gaussian.mu, gaussian.sigma) } -List(mu.toArray, sigma.toArray).map(_.asInstanceOf[Object]).asJava +SerDe.dumps(JavaConverters.seqAsJavaListConverter(modelGaussians).asJava) } def save(sc: SparkContext, path: String): Unit = model.save(sc, path) http://git-wip-us.apache.org/repos/asf/spark/blob/c0e48dfa/python/pyspark/mllib/clustering.py -- diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 1fa061d..c9e6f1d 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -266,7 +266,7 @@ class GaussianMixtureModel(JavaModelWrapper, JavaSaveable, JavaLoader): """ return [ MultivariateGaussian(gaussian[0], gaussian[1]) -for gaussian in zip(*self.call("gaussians"))] +for gaussian in self.call("gaussians")] @property @since('1.4.0') - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10192][CORE] simple test w/ failure involving a shared dependency
Repository: spark Updated Branches: refs/heads/master c0e48dfa6 -> 33112f9c4 [SPARK-10192][CORE] simple test w/ failure involving a shared dependency just trying to increase test coverage in the scheduler, this already works. It includes a regression test for SPARK-9809 copied some test utils from https://github.com/apache/spark/pull/5636, we can wait till that is merged first Author: Imran RashidCloses #8402 from squito/test_retry_in_shared_shuffle_dep. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33112f9c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33112f9c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33112f9c Branch: refs/heads/master Commit: 33112f9c48680c33d663978f76806ebf0ea39789 Parents: c0e48df Author: Imran Rashid Authored: Tue Nov 10 16:50:22 2015 -0800 Committer: Andrew Or Committed: Tue Nov 10 16:50:22 2015 -0800 -- .../spark/scheduler/DAGSchedulerSuite.scala | 51 +++- 1 file changed, 49 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/33112f9c/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 3816b8c..068b49b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -594,11 +594,17 @@ class DAGSchedulerSuite * @param stageId - The current stageId * @param attemptIdx - The current attempt count */ - private def completeNextResultStageWithSuccess(stageId: Int, attemptIdx: Int): Unit = { + private def completeNextResultStageWithSuccess( + stageId: Int, + attemptIdx: Int, + partitionToResult: Int => Int = _ => 42): Unit = { val stageAttempt = taskSets.last checkStageId(stageId, attemptIdx, stageAttempt) assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage]) -complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success, 42)).toSeq) +val taskResults = stageAttempt.tasks.zipWithIndex.map { case (task, idx) => + (Success, partitionToResult(idx)) +} +complete(stageAttempt, taskResults.toSeq) } /** @@ -1055,6 +1061,47 @@ class DAGSchedulerSuite } /** + * Run two jobs, with a shared dependency. We simulate a fetch failure in the second job, which + * requires regenerating some outputs of the shared dependency. One key aspect of this test is + * that the second job actually uses a different stage for the shared dependency (a "skipped" + * stage). + */ + test("shuffle fetch failure in a reused shuffle dependency") { +// Run the first job successfully, which creates one shuffle dependency + +val shuffleMapRdd = new MyRDD(sc, 2, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) +val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) +submit(reduceRdd, Array(0, 1)) + +completeShuffleMapStageSuccessfully(0, 0, 2) +completeNextResultStageWithSuccess(1, 0) +assert(results === Map(0 -> 42, 1 -> 42)) +assertDataStructuresEmpty() + +// submit another job w/ the shared dependency, and have a fetch failure +val reduce2 = new MyRDD(sc, 2, List(shuffleDep)) +submit(reduce2, Array(0, 1)) +// Note that the stage numbering here is only b/c the shared dependency produces a new, skipped +// stage. If instead it reused the existing stage, then this would be stage 2 +completeNextStageWithFetchFailure(3, 0, shuffleDep) +scheduler.resubmitFailedStages() + +// the scheduler now creates a new task set to regenerate the missing map output, but this time +// using a different stage, the "skipped" one + +// SPARK-9809 -- this stage is submitted without a task for each partition (because some of +// the shuffle map output is still available from stage 0); make sure we've still got internal +// accumulators setup +assert(scheduler.stageIdToStage(2).internalAccumulators.nonEmpty) +completeShuffleMapStageSuccessfully(2, 0, 2) +completeNextResultStageWithSuccess(3, 1, idx => idx + 1234) +assert(results === Map(0 -> 1234, 1 -> 1235)) + +assertDataStructuresEmpty() + } + + /** * This test runs a three stage job, with a fetch failure in stage 1. but during the retry, we * have completions from both the first & second attempt of stage 1. So all the map output is * available before we finish any task set for stage 1. We want to make sure that we don't
spark git commit: [SPARK-10192][CORE] simple test w/ failure involving a shared dependency
Repository: spark Updated Branches: refs/heads/branch-1.6 d8bfc025c -> 10272d5c9 [SPARK-10192][CORE] simple test w/ failure involving a shared dependency just trying to increase test coverage in the scheduler, this already works. It includes a regression test for SPARK-9809 copied some test utils from https://github.com/apache/spark/pull/5636, we can wait till that is merged first Author: Imran RashidCloses #8402 from squito/test_retry_in_shared_shuffle_dep. (cherry picked from commit 33112f9c48680c33d663978f76806ebf0ea39789) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10272d5c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10272d5c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10272d5c Branch: refs/heads/branch-1.6 Commit: 10272d5c98694e5a0cfef2587a81be7ce609cbb7 Parents: d8bfc02 Author: Imran Rashid Authored: Tue Nov 10 16:50:22 2015 -0800 Committer: Andrew Or Committed: Tue Nov 10 16:50:34 2015 -0800 -- .../spark/scheduler/DAGSchedulerSuite.scala | 51 +++- 1 file changed, 49 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/10272d5c/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 3816b8c..068b49b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -594,11 +594,17 @@ class DAGSchedulerSuite * @param stageId - The current stageId * @param attemptIdx - The current attempt count */ - private def completeNextResultStageWithSuccess(stageId: Int, attemptIdx: Int): Unit = { + private def completeNextResultStageWithSuccess( + stageId: Int, + attemptIdx: Int, + partitionToResult: Int => Int = _ => 42): Unit = { val stageAttempt = taskSets.last checkStageId(stageId, attemptIdx, stageAttempt) assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage]) -complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success, 42)).toSeq) +val taskResults = stageAttempt.tasks.zipWithIndex.map { case (task, idx) => + (Success, partitionToResult(idx)) +} +complete(stageAttempt, taskResults.toSeq) } /** @@ -1055,6 +1061,47 @@ class DAGSchedulerSuite } /** + * Run two jobs, with a shared dependency. We simulate a fetch failure in the second job, which + * requires regenerating some outputs of the shared dependency. One key aspect of this test is + * that the second job actually uses a different stage for the shared dependency (a "skipped" + * stage). + */ + test("shuffle fetch failure in a reused shuffle dependency") { +// Run the first job successfully, which creates one shuffle dependency + +val shuffleMapRdd = new MyRDD(sc, 2, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) +val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) +submit(reduceRdd, Array(0, 1)) + +completeShuffleMapStageSuccessfully(0, 0, 2) +completeNextResultStageWithSuccess(1, 0) +assert(results === Map(0 -> 42, 1 -> 42)) +assertDataStructuresEmpty() + +// submit another job w/ the shared dependency, and have a fetch failure +val reduce2 = new MyRDD(sc, 2, List(shuffleDep)) +submit(reduce2, Array(0, 1)) +// Note that the stage numbering here is only b/c the shared dependency produces a new, skipped +// stage. If instead it reused the existing stage, then this would be stage 2 +completeNextStageWithFetchFailure(3, 0, shuffleDep) +scheduler.resubmitFailedStages() + +// the scheduler now creates a new task set to regenerate the missing map output, but this time +// using a different stage, the "skipped" one + +// SPARK-9809 -- this stage is submitted without a task for each partition (because some of +// the shuffle map output is still available from stage 0); make sure we've still got internal +// accumulators setup +assert(scheduler.stageIdToStage(2).internalAccumulators.nonEmpty) +completeShuffleMapStageSuccessfully(2, 0, 2) +completeNextResultStageWithSuccess(3, 1, idx => idx + 1234) +assert(results === Map(0 -> 1234, 1 -> 1235)) + +assertDataStructuresEmpty() + } + + /** * This test runs a three stage job, with a fetch failure in stage 1. but during the retry, we * have completions from both the first & second attempt of stage 1.
spark git commit: [SPARK-11361][STREAMING] Show scopes of RDD operations inside DStream.foreachRDD and DStream.transform in DAG viz
Repository: spark Updated Branches: refs/heads/master 900917541 -> 6600786dd [SPARK-11361][STREAMING] Show scopes of RDD operations inside DStream.foreachRDD and DStream.transform in DAG viz Currently, when a DStream sets the scope for RDD generated by it, that scope is not allowed to be overridden by the RDD operations. So in case of `DStream.foreachRDD`, all the RDDs generated inside the foreachRDD get the same scope - `foreachRDD `, as set by the `ForeachDStream`. So it is hard to debug generated RDDs in the RDD DAG viz in the Spark UI. This patch allows the RDD operations inside `DStream.transform` and `DStream.foreachRDD` to append their own scopes to the earlier DStream scope. I have also slightly tweaked how callsites are set such that the short callsite reflects the RDD operation name and line number. This tweak is necessary as callsites are not managed through scopes (which support nesting and overriding) and I didnt want to add another local property to control nesting and overriding of callsites. ## Before: ![image](https://cloud.githubusercontent.com/assets/663212/10808548/fa71c0c4-7da9-11e5-9af0-5737793a146f.png) ## After: ![image](https://cloud.githubusercontent.com/assets/663212/10808659/37bc45b6-7dab-11e5-8041-c20be6a9bc26.png) The code that was used to generate this is: ``` val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.foreachRDD { rdd => val temp = rdd.map { _ -> 1 }.reduceByKey( _ + _) val temp2 = temp.map { _ -> 1}.reduceByKey(_ + _) val count = temp2.count println(count) } ``` Note - The inner scopes of the RDD operations map/reduceByKey inside foreachRDD is visible - The short callsites of stages refers to the line number of the RDD ops rather than the same line number of foreachRDD in all three cases. Author: Tathagata DasCloses #9315 from tdas/SPARK-11361. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6600786d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6600786d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6600786d Branch: refs/heads/master Commit: 6600786dddc89cb16779ee56b9173f63a3af3f27 Parents: 9009175 Author: Tathagata Das Authored: Tue Nov 10 16:54:06 2015 -0800 Committer: Andrew Or Committed: Tue Nov 10 16:54:06 2015 -0800 -- .../scala/org/apache/spark/SparkContext.scala | 9 +-- .../spark/streaming/TestOutputStream.scala | 2 +- .../spark/streaming/dstream/DStream.scala | 63 +--- .../streaming/dstream/ForEachDStream.scala | 14 +++- .../streaming/dstream/TransformedDStream.scala | 13 .../spark/streaming/DStreamScopeSuite.scala | 75 .../apache/spark/streaming/TestSuiteBase.scala | 4 +- 7 files changed, 147 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6600786d/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7421821..67270c3 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1787,10 +1787,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * has overridden the call site using `setCallSite()`, this will return the user's version. */ private[spark] def getCallSite(): CallSite = { -Option(getLocalProperty(CallSite.SHORT_FORM)).map { case shortCallSite => - val longCallSite = Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse("") - CallSite(shortCallSite, longCallSite) -}.getOrElse(Utils.getCallSite()) +val callSite = Utils.getCallSite() +CallSite( + Option(getLocalProperty(CallSite.SHORT_FORM)).getOrElse(callSite.shortForm), + Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse(callSite.longForm) +) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/6600786d/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala -- diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala index 1a9..79077e4 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala +++
spark git commit: [SPARK-11361][STREAMING] Show scopes of RDD operations inside DStream.foreachRDD and DStream.transform in DAG viz
Repository: spark Updated Branches: refs/heads/branch-1.6 fafeca36e -> b34819c7b [SPARK-11361][STREAMING] Show scopes of RDD operations inside DStream.foreachRDD and DStream.transform in DAG viz Currently, when a DStream sets the scope for RDD generated by it, that scope is not allowed to be overridden by the RDD operations. So in case of `DStream.foreachRDD`, all the RDDs generated inside the foreachRDD get the same scope - `foreachRDD `, as set by the `ForeachDStream`. So it is hard to debug generated RDDs in the RDD DAG viz in the Spark UI. This patch allows the RDD operations inside `DStream.transform` and `DStream.foreachRDD` to append their own scopes to the earlier DStream scope. I have also slightly tweaked how callsites are set such that the short callsite reflects the RDD operation name and line number. This tweak is necessary as callsites are not managed through scopes (which support nesting and overriding) and I didnt want to add another local property to control nesting and overriding of callsites. ## Before: ![image](https://cloud.githubusercontent.com/assets/663212/10808548/fa71c0c4-7da9-11e5-9af0-5737793a146f.png) ## After: ![image](https://cloud.githubusercontent.com/assets/663212/10808659/37bc45b6-7dab-11e5-8041-c20be6a9bc26.png) The code that was used to generate this is: ``` val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.foreachRDD { rdd => val temp = rdd.map { _ -> 1 }.reduceByKey( _ + _) val temp2 = temp.map { _ -> 1}.reduceByKey(_ + _) val count = temp2.count println(count) } ``` Note - The inner scopes of the RDD operations map/reduceByKey inside foreachRDD is visible - The short callsites of stages refers to the line number of the RDD ops rather than the same line number of foreachRDD in all three cases. Author: Tathagata DasCloses #9315 from tdas/SPARK-11361. (cherry picked from commit 6600786dddc89cb16779ee56b9173f63a3af3f27) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b34819c7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b34819c7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b34819c7 Branch: refs/heads/branch-1.6 Commit: b34819c7b720d6407d428deca4b00a02ab7c3503 Parents: fafeca3 Author: Tathagata Das Authored: Tue Nov 10 16:54:06 2015 -0800 Committer: Andrew Or Committed: Tue Nov 10 16:54:23 2015 -0800 -- .../scala/org/apache/spark/SparkContext.scala | 9 +-- .../spark/streaming/TestOutputStream.scala | 2 +- .../spark/streaming/dstream/DStream.scala | 63 +--- .../streaming/dstream/ForEachDStream.scala | 14 +++- .../streaming/dstream/TransformedDStream.scala | 13 .../spark/streaming/DStreamScopeSuite.scala | 75 .../apache/spark/streaming/TestSuiteBase.scala | 4 +- 7 files changed, 147 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b34819c7/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7421821..67270c3 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1787,10 +1787,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * has overridden the call site using `setCallSite()`, this will return the user's version. */ private[spark] def getCallSite(): CallSite = { -Option(getLocalProperty(CallSite.SHORT_FORM)).map { case shortCallSite => - val longCallSite = Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse("") - CallSite(shortCallSite, longCallSite) -}.getOrElse(Utils.getCallSite()) +val callSite = Utils.getCallSite() +CallSite( + Option(getLocalProperty(CallSite.SHORT_FORM)).getOrElse(callSite.shortForm), + Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse(callSite.longForm) +) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/b34819c7/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala -- diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala index 1a9..79077e4 100644
spark git commit: [SPARK-11615] Drop @VisibleForTesting annotation
Repository: spark Updated Branches: refs/heads/branch-1.6 93ac30741 -> fafeca36e [SPARK-11615] Drop @VisibleForTesting annotation See http://search-hadoop.com/m/q3RTtjpe8r1iRbTj2 for discussion. Summary: addition of VisibleForTesting annotation resulted in spark-shell malfunctioning. Author: tedyuCloses #9585 from tedyu/master. (cherry picked from commit 900917541651abe7125f0d205085d2ab6a00d92c) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fafeca36 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fafeca36 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fafeca36 Branch: refs/heads/branch-1.6 Commit: fafeca36eaa04e6e54bcab863959cce710698e30 Parents: 93ac307 Author: tedyu Authored: Tue Nov 10 16:52:26 2015 -0800 Committer: Andrew Or Committed: Tue Nov 10 16:53:09 2015 -0800 -- core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala | 8 .../scala/org/apache/spark/ui/jobs/JobProgressListener.scala | 2 -- .../org/apache/spark/util/AsynchronousListenerBus.scala | 5 ++--- .../org/apache/spark/util/collection/ExternalSorter.scala| 3 +-- scalastyle-config.xml| 7 +++ .../org/apache/spark/sql/execution/QueryExecution.scala | 3 --- .../apache/spark/network/shuffle/ShuffleTestAccessor.scala | 1 - 7 files changed, 14 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fafeca36/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala -- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala index c72b588..464027f 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala @@ -21,8 +21,6 @@ import javax.annotation.concurrent.GuardedBy import scala.util.control.NonFatal -import com.google.common.annotations.VisibleForTesting - import org.apache.spark.{Logging, SparkException} import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, ThreadSafeRpcEndpoint} @@ -193,8 +191,10 @@ private[netty] class Inbox( def isEmpty: Boolean = inbox.synchronized { messages.isEmpty } - /** Called when we are dropping a message. Test cases override this to test message dropping. */ - @VisibleForTesting + /** + * Called when we are dropping a message. Test cases override this to test message dropping. + * Exposed for testing. + */ protected def onDrop(message: InboxMessage): Unit = { logWarning(s"Drop $message because $endpointRef is stopped") } http://git-wip-us.apache.org/repos/asf/spark/blob/fafeca36/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 77d034f..ca37829 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -21,8 +21,6 @@ import java.util.concurrent.TimeoutException import scala.collection.mutable.{HashMap, HashSet, ListBuffer} -import com.google.common.annotations.VisibleForTesting - import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics http://git-wip-us.apache.org/repos/asf/spark/blob/fafeca36/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala index b8481ea..b3b54af 100644 --- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -20,7 +20,6 @@ package org.apache.spark.util import java.util.concurrent._ import java.util.concurrent.atomic.AtomicBoolean -import com.google.common.annotations.VisibleForTesting import org.apache.spark.SparkContext /** @@ -119,8 +118,8 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri * For testing only. Wait until there are no more events in the queue, or until the specified * time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue * emptied. + * Exposed for testing. */ - @VisibleForTesting
spark git commit: [SPARK-9241][SQL] Supporting multiple DISTINCT columns - follow-up (3)
Repository: spark Updated Branches: refs/heads/master 3121e7816 -> 21c562fa0 [SPARK-9241][SQL] Supporting multiple DISTINCT columns - follow-up (3) This PR is a 2nd follow-up for [SPARK-9241](https://issues.apache.org/jira/browse/SPARK-9241). It contains the following improvements: * Fix for a potential bug in distinct child expression and attribute alignment. * Improved handling of duplicate distinct child expressions. * Added test for distinct UDAF with multiple children. cc yhuai Author: Herman van HovellCloses #9566 from hvanhovell/SPARK-9241-followup-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21c562fa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21c562fa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21c562fa Branch: refs/heads/master Commit: 21c562fa03430365f5c2b7d6de1f8f60ab2140d4 Parents: 3121e78 Author: Herman van Hovell Authored: Tue Nov 10 16:28:21 2015 -0800 Committer: Yin Huai Committed: Tue Nov 10 16:28:21 2015 -0800 -- .../analysis/DistinctAggregationRewriter.scala | 9 +++-- .../hive/execution/AggregationQuerySuite.scala | 41 ++-- 2 files changed, 42 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/21c562fa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala index 397eff0..c0c9604 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala @@ -151,11 +151,12 @@ case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalP } // Setup unique distinct aggregate children. - val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq - val distinctAggChildAttrMap = distinctAggChildren.map(expressionAttributePair).toMap - val distinctAggChildAttrs = distinctAggChildAttrMap.values.toSeq + val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq.distinct + val distinctAggChildAttrMap = distinctAggChildren.map(expressionAttributePair) + val distinctAggChildAttrs = distinctAggChildAttrMap.map(_._2) // Setup expand & aggregate operators for distinct aggregate expressions. + val distinctAggChildAttrLookup = distinctAggChildAttrMap.toMap val distinctAggOperatorMap = distinctAggGroups.toSeq.zipWithIndex.map { case ((group, expressions), i) => val id = Literal(i + 1) @@ -170,7 +171,7 @@ case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalP val operators = expressions.map { e => val af = e.aggregateFunction val naf = patchAggregateFunctionChildren(af) { x => - evalWithinGroup(id, distinctAggChildAttrMap(x)) + evalWithinGroup(id, distinctAggChildAttrLookup(x)) } (e, e.copy(aggregateFunction = naf, isDistinct = false)) } http://git-wip-us.apache.org/repos/asf/spark/blob/21c562fa/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 6bf2c53..8253921 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -66,6 +66,36 @@ class ScalaAggregateFunction(schema: StructType) extends UserDefinedAggregateFun } } +class LongProductSum extends UserDefinedAggregateFunction { + def inputSchema: StructType = new StructType() +.add("a", LongType) +.add("b", LongType) + + def bufferSchema: StructType = new StructType() +.add("product", LongType) + + def dataType: DataType = LongType + + def deterministic: Boolean = true + + def initialize(buffer: MutableAggregationBuffer): Unit = { +buffer(0) = 0L + } + + def update(buffer: MutableAggregationBuffer, input: Row): Unit = { +if (!(input.isNullAt(0) || input.isNullAt(1))) { + buffer(0) = buffer.getLong(0) + input.getLong(0) * input.getLong(1) +} + } + + def merge(buffer1:
spark git commit: [SPARK-11566] [MLLIB] [PYTHON] Refactoring GaussianMixtureModel.gaussians in Python
Repository: spark Updated Branches: refs/heads/branch-1.6 9ccd1bb80 -> d8bfc025c [SPARK-11566] [MLLIB] [PYTHON] Refactoring GaussianMixtureModel.gaussians in Python cc jkbradley Author: Yu ISHIKAWACloses #9534 from yu-iskw/SPARK-11566. (cherry picked from commit c0e48dfa611fa5d94132af7e6f6731f60ab833da) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8bfc025 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8bfc025 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8bfc025 Branch: refs/heads/branch-1.6 Commit: d8bfc025c0855af0e7dcd1b9b1efdb43e3fba794 Parents: 9ccd1bb Author: Yu ISHIKAWA Authored: Tue Nov 10 16:42:28 2015 -0800 Committer: Davies Liu Committed: Tue Nov 10 16:44:45 2015 -0800 -- .../python/GaussianMixtureModelWrapper.scala| 21 ++-- python/pyspark/mllib/clustering.py | 2 +- 2 files changed, 7 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d8bfc025/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala index 0ec88ef..6a3b20c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala @@ -17,14 +17,11 @@ package org.apache.spark.mllib.api.python -import java.util.{List => JList} - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters import org.apache.spark.SparkContext -import org.apache.spark.mllib.linalg.{Vector, Vectors, Matrix} import org.apache.spark.mllib.clustering.GaussianMixtureModel +import org.apache.spark.mllib.linalg.{Vector, Vectors} /** * Wrapper around GaussianMixtureModel to provide helper methods in Python @@ -36,17 +33,11 @@ private[python] class GaussianMixtureModelWrapper(model: GaussianMixtureModel) { /** * Returns gaussians as a List of Vectors and Matrices corresponding each MultivariateGaussian */ - val gaussians: JList[Object] = { -val modelGaussians = model.gaussians -var i = 0 -var mu = ArrayBuffer.empty[Vector] -var sigma = ArrayBuffer.empty[Matrix] -while (i < k) { - mu += modelGaussians(i).mu - sigma += modelGaussians(i).sigma - i += 1 + val gaussians: Array[Byte] = { +val modelGaussians = model.gaussians.map { gaussian => + Array[Any](gaussian.mu, gaussian.sigma) } -List(mu.toArray, sigma.toArray).map(_.asInstanceOf[Object]).asJava +SerDe.dumps(JavaConverters.seqAsJavaListConverter(modelGaussians).asJava) } def save(sc: SparkContext, path: String): Unit = model.save(sc, path) http://git-wip-us.apache.org/repos/asf/spark/blob/d8bfc025/python/pyspark/mllib/clustering.py -- diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 1fa061d..c9e6f1d 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -266,7 +266,7 @@ class GaussianMixtureModel(JavaModelWrapper, JavaSaveable, JavaLoader): """ return [ MultivariateGaussian(gaussian[0], gaussian[1]) -for gaussian in zip(*self.call("gaussians"))] +for gaussian in self.call("gaussians")] @property @since('1.4.0') - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR] Fix typo in AggregationQuerySuite.scala
Repository: spark Updated Branches: refs/heads/master 6600786dd -> 12c7635dc [MINOR] Fix typo in AggregationQuerySuite.scala Author: Forest FangCloses #9357 from saurfang/patch-1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12c7635d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12c7635d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12c7635d Branch: refs/heads/master Commit: 12c7635dc025239d3b69b9adef2f4eebb28edf48 Parents: 6600786 Author: Forest Fang Authored: Tue Nov 10 16:55:58 2015 -0800 Committer: Andrew Or Committed: Tue Nov 10 16:56:06 2015 -0800 -- .../apache/spark/sql/hive/execution/AggregationQuerySuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/12c7635d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 8253921..22d2aef 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -795,14 +795,14 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te val df = sqlContext.createDataFrame(rdd, schema) val allColumns = df.schema.fields.map(f => col(f.name)) - val expectedAnaswer = + val expectedAnswer = data .find(r => r.getInt(0) == 50) .getOrElse(fail("A row with id 50 should be the expected answer.")) checkAnswer( df.groupBy().agg(udaf(allColumns: _*)), // udaf returns a Row as the output value. -Row(expectedAnaswer) +Row(expectedAnswer) ) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6726][ML] Import/export for spark.ml LogisticRegressionModel
Repository: spark Updated Branches: refs/heads/branch-1.6 b34819c7b -> bc3bfa04d [SPARK-6726][ML] Import/export for spark.ml LogisticRegressionModel This PR adds model save/load for spark.ml's LogisticRegressionModel. It also does minor refactoring of the default save/load classes to reuse code. CC: mengxr Author: Joseph K. BradleyCloses #9606 from jkbradley/logreg-io2. (cherry picked from commit 6e101d2e9d6e08a6a63f7065c1e87a5338f763ea) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc3bfa04 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc3bfa04 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc3bfa04 Branch: refs/heads/branch-1.6 Commit: bc3bfa04d39992b7451c0a15c2f1cb9cc22c15e7 Parents: b34819c Author: Joseph K. Bradley Authored: Tue Nov 10 18:45:48 2015 -0800 Committer: Xiangrui Meng Committed: Tue Nov 10 18:45:57 2015 -0800 -- .../ml/classification/LogisticRegression.scala | 68 +- .../org/apache/spark/ml/util/ReadWrite.scala| 74 ++-- .../LogisticRegressionSuite.scala | 17 - .../spark/ml/util/DefaultReadWriteTest.scala| 4 +- 4 files changed, 152 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bc3bfa04/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index f5fca68..a88f526 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -21,13 +21,14 @@ import scala.collection.mutable import breeze.linalg.{DenseVector => BDV} import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, OWLQN => BreezeOWLQN} +import org.apache.hadoop.fs.Path import org.apache.spark.{Logging, SparkException} import org.apache.spark.annotation.Experimental import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ -import org.apache.spark.ml.util.Identifiable +import org.apache.spark.ml.util._ import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.linalg.BLAS._ import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics @@ -396,7 +397,7 @@ class LogisticRegressionModel private[ml] ( val coefficients: Vector, val intercept: Double) extends ProbabilisticClassificationModel[Vector, LogisticRegressionModel] - with LogisticRegressionParams { + with LogisticRegressionParams with Writable { @deprecated("Use coefficients instead.", "1.6.0") def weights: Vector = coefficients @@ -510,8 +511,71 @@ class LogisticRegressionModel private[ml] ( // Note: We should use getThreshold instead of $(threshold) since getThreshold is overridden. if (probability(1) > getThreshold) 1 else 0 } + + /** + * Returns a [[Writer]] instance for this ML instance. + * + * For [[LogisticRegressionModel]], this does NOT currently save the training [[summary]]. + * An option to save [[summary]] may be added in the future. + */ + override def write: Writer = new LogisticRegressionWriter(this) +} + + +/** [[Writer]] instance for [[LogisticRegressionModel]] */ +private[classification] class LogisticRegressionWriter(instance: LogisticRegressionModel) + extends Writer with Logging { + + private case class Data( + numClasses: Int, + numFeatures: Int, + intercept: Double, + coefficients: Vector) + + override protected def saveImpl(path: String): Unit = { +// Save metadata and Params +DefaultParamsWriter.saveMetadata(instance, path, sc) +// Save model data: numClasses, numFeatures, intercept, coefficients +val data = Data(instance.numClasses, instance.numFeatures, instance.intercept, + instance.coefficients) +val dataPath = new Path(path, "data").toString + sqlContext.createDataFrame(Seq(data)).write.format("parquet").save(dataPath) + } +} + + +object LogisticRegressionModel extends Readable[LogisticRegressionModel] { + + override def read: Reader[LogisticRegressionModel] = new LogisticRegressionReader + + override def load(path: String): LogisticRegressionModel = read.load(path) } + +private[classification] class LogisticRegressionReader extends Reader[LogisticRegressionModel] { + + /** Checked against metadata when loading model */ + private val className =
spark git commit: [SPARK-6726][ML] Import/export for spark.ml LogisticRegressionModel
Repository: spark Updated Branches: refs/heads/master 745e45d5f -> 6e101d2e9 [SPARK-6726][ML] Import/export for spark.ml LogisticRegressionModel This PR adds model save/load for spark.ml's LogisticRegressionModel. It also does minor refactoring of the default save/load classes to reuse code. CC: mengxr Author: Joseph K. BradleyCloses #9606 from jkbradley/logreg-io2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e101d2e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e101d2e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e101d2e Branch: refs/heads/master Commit: 6e101d2e9d6e08a6a63f7065c1e87a5338f763ea Parents: 745e45d Author: Joseph K. Bradley Authored: Tue Nov 10 18:45:48 2015 -0800 Committer: Xiangrui Meng Committed: Tue Nov 10 18:45:48 2015 -0800 -- .../ml/classification/LogisticRegression.scala | 68 +- .../org/apache/spark/ml/util/ReadWrite.scala| 74 ++-- .../LogisticRegressionSuite.scala | 17 - .../spark/ml/util/DefaultReadWriteTest.scala| 4 +- 4 files changed, 152 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6e101d2e/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index f5fca68..a88f526 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -21,13 +21,14 @@ import scala.collection.mutable import breeze.linalg.{DenseVector => BDV} import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, OWLQN => BreezeOWLQN} +import org.apache.hadoop.fs.Path import org.apache.spark.{Logging, SparkException} import org.apache.spark.annotation.Experimental import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ -import org.apache.spark.ml.util.Identifiable +import org.apache.spark.ml.util._ import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.linalg.BLAS._ import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics @@ -396,7 +397,7 @@ class LogisticRegressionModel private[ml] ( val coefficients: Vector, val intercept: Double) extends ProbabilisticClassificationModel[Vector, LogisticRegressionModel] - with LogisticRegressionParams { + with LogisticRegressionParams with Writable { @deprecated("Use coefficients instead.", "1.6.0") def weights: Vector = coefficients @@ -510,8 +511,71 @@ class LogisticRegressionModel private[ml] ( // Note: We should use getThreshold instead of $(threshold) since getThreshold is overridden. if (probability(1) > getThreshold) 1 else 0 } + + /** + * Returns a [[Writer]] instance for this ML instance. + * + * For [[LogisticRegressionModel]], this does NOT currently save the training [[summary]]. + * An option to save [[summary]] may be added in the future. + */ + override def write: Writer = new LogisticRegressionWriter(this) +} + + +/** [[Writer]] instance for [[LogisticRegressionModel]] */ +private[classification] class LogisticRegressionWriter(instance: LogisticRegressionModel) + extends Writer with Logging { + + private case class Data( + numClasses: Int, + numFeatures: Int, + intercept: Double, + coefficients: Vector) + + override protected def saveImpl(path: String): Unit = { +// Save metadata and Params +DefaultParamsWriter.saveMetadata(instance, path, sc) +// Save model data: numClasses, numFeatures, intercept, coefficients +val data = Data(instance.numClasses, instance.numFeatures, instance.intercept, + instance.coefficients) +val dataPath = new Path(path, "data").toString + sqlContext.createDataFrame(Seq(data)).write.format("parquet").save(dataPath) + } +} + + +object LogisticRegressionModel extends Readable[LogisticRegressionModel] { + + override def read: Reader[LogisticRegressionModel] = new LogisticRegressionReader + + override def load(path: String): LogisticRegressionModel = read.load(path) } + +private[classification] class LogisticRegressionReader extends Reader[LogisticRegressionModel] { + + /** Checked against metadata when loading model */ + private val className = "org.apache.spark.ml.classification.LogisticRegressionModel" + + override def load(path: String): LogisticRegressionModel = { +val metadata =
spark git commit: [SPARK-10827][CORE] AppClient should not use `askWithReply` in `receiveAndReply`
Repository: spark Updated Branches: refs/heads/branch-1.6 ce5aba32f -> 9ccd1bb80 [SPARK-10827][CORE] AppClient should not use `askWithReply` in `receiveAndReply` Changed AppClient to be non-blocking in `receiveAndReply` by using a separate thread to wait for response and reply to the context. The threads are managed by a thread pool. Also added unit tests for the AppClient interface. Author: Bryan CutlerCloses #9317 from BryanCutler/appClient-receiveAndReply-SPARK-10827. (cherry picked from commit a3989058c0938c8c59c278e7d1a766701cfa255b) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ccd1bb8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ccd1bb8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ccd1bb8 Branch: refs/heads/branch-1.6 Commit: 9ccd1bb80d89745a78a97f23c6417d2b52ab2a8e Parents: ce5aba3 Author: Bryan Cutler Authored: Tue Nov 10 16:32:32 2015 -0800 Committer: Reynold Xin Committed: Tue Nov 10 16:32:39 2015 -0800 -- .../apache/spark/deploy/client/AppClient.scala | 33 ++- .../spark/deploy/client/AppClientSuite.scala| 209 +++ 2 files changed, 238 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9ccd1bb8/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 25ea692..3f29da6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -49,8 +49,8 @@ private[spark] class AppClient( private val REGISTRATION_TIMEOUT_SECONDS = 20 private val REGISTRATION_RETRIES = 3 - private var endpoint: RpcEndpointRef = null - private var appId: String = null + @volatile private var endpoint: RpcEndpointRef = null + @volatile private var appId: String = null @volatile private var registered = false private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint @@ -77,6 +77,11 @@ private[spark] class AppClient( private val registrationRetryThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread") +// A thread pool to perform receive then reply actions in a thread so as not to block the +// event loop. +private val askAndReplyThreadPool = + ThreadUtils.newDaemonCachedThreadPool("appclient-receive-and-reply-threadpool") + override def onStart(): Unit = { try { registerWithMaster(1) @@ -200,7 +205,7 @@ private[spark] class AppClient( case r: RequestExecutors => master match { - case Some(m) => context.reply(m.askWithRetry[Boolean](r)) + case Some(m) => askAndReplyAsync(m, context, r) case None => logWarning("Attempted to request executors before registering with Master.") context.reply(false) @@ -208,13 +213,32 @@ private[spark] class AppClient( case k: KillExecutors => master match { - case Some(m) => context.reply(m.askWithRetry[Boolean](k)) + case Some(m) => askAndReplyAsync(m, context, k) case None => logWarning("Attempted to kill executors before registering with Master.") context.reply(false) } } +private def askAndReplyAsync[T]( +endpointRef: RpcEndpointRef, +context: RpcCallContext, +msg: T): Unit = { + // Create a thread to ask a message and reply with the result. Allow thread to be + // interrupted during shutdown, otherwise context must be notified of NonFatal errors. + askAndReplyThreadPool.execute(new Runnable { +override def run(): Unit = { + try { +context.reply(endpointRef.askWithRetry[Boolean](msg)) + } catch { +case ie: InterruptedException => // Cancelled +case NonFatal(t) => + context.sendFailure(t) + } +} + }) +} + override def onDisconnected(address: RpcAddress): Unit = { if (master.exists(_.address == address)) { logWarning(s"Connection to $address failed; waiting for master to reconnect...") @@ -252,6 +276,7 @@ private[spark] class AppClient( registrationRetryThread.shutdownNow() registerMasterFutures.foreach(_.cancel(true)) registerMasterThreadPool.shutdownNow() + askAndReplyThreadPool.shutdownNow() } }
spark git commit: [SPARK-10827][CORE] AppClient should not use `askWithReply` in `receiveAndReply`
Repository: spark Updated Branches: refs/heads/master 21c562fa0 -> a3989058c [SPARK-10827][CORE] AppClient should not use `askWithReply` in `receiveAndReply` Changed AppClient to be non-blocking in `receiveAndReply` by using a separate thread to wait for response and reply to the context. The threads are managed by a thread pool. Also added unit tests for the AppClient interface. Author: Bryan CutlerCloses #9317 from BryanCutler/appClient-receiveAndReply-SPARK-10827. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3989058 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3989058 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3989058 Branch: refs/heads/master Commit: a3989058c0938c8c59c278e7d1a766701cfa255b Parents: 21c562f Author: Bryan Cutler Authored: Tue Nov 10 16:32:32 2015 -0800 Committer: Reynold Xin Committed: Tue Nov 10 16:32:32 2015 -0800 -- .../apache/spark/deploy/client/AppClient.scala | 33 ++- .../spark/deploy/client/AppClientSuite.scala| 209 +++ 2 files changed, 238 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a3989058/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 25ea692..3f29da6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -49,8 +49,8 @@ private[spark] class AppClient( private val REGISTRATION_TIMEOUT_SECONDS = 20 private val REGISTRATION_RETRIES = 3 - private var endpoint: RpcEndpointRef = null - private var appId: String = null + @volatile private var endpoint: RpcEndpointRef = null + @volatile private var appId: String = null @volatile private var registered = false private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint @@ -77,6 +77,11 @@ private[spark] class AppClient( private val registrationRetryThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread") +// A thread pool to perform receive then reply actions in a thread so as not to block the +// event loop. +private val askAndReplyThreadPool = + ThreadUtils.newDaemonCachedThreadPool("appclient-receive-and-reply-threadpool") + override def onStart(): Unit = { try { registerWithMaster(1) @@ -200,7 +205,7 @@ private[spark] class AppClient( case r: RequestExecutors => master match { - case Some(m) => context.reply(m.askWithRetry[Boolean](r)) + case Some(m) => askAndReplyAsync(m, context, r) case None => logWarning("Attempted to request executors before registering with Master.") context.reply(false) @@ -208,13 +213,32 @@ private[spark] class AppClient( case k: KillExecutors => master match { - case Some(m) => context.reply(m.askWithRetry[Boolean](k)) + case Some(m) => askAndReplyAsync(m, context, k) case None => logWarning("Attempted to kill executors before registering with Master.") context.reply(false) } } +private def askAndReplyAsync[T]( +endpointRef: RpcEndpointRef, +context: RpcCallContext, +msg: T): Unit = { + // Create a thread to ask a message and reply with the result. Allow thread to be + // interrupted during shutdown, otherwise context must be notified of NonFatal errors. + askAndReplyThreadPool.execute(new Runnable { +override def run(): Unit = { + try { +context.reply(endpointRef.askWithRetry[Boolean](msg)) + } catch { +case ie: InterruptedException => // Cancelled +case NonFatal(t) => + context.sendFailure(t) + } +} + }) +} + override def onDisconnected(address: RpcAddress): Unit = { if (master.exists(_.address == address)) { logWarning(s"Connection to $address failed; waiting for master to reconnect...") @@ -252,6 +276,7 @@ private[spark] class AppClient( registrationRetryThread.shutdownNow() registerMasterFutures.foreach(_.cancel(true)) registerMasterThreadPool.shutdownNow() + askAndReplyThreadPool.shutdownNow() } } http://git-wip-us.apache.org/repos/asf/spark/blob/a3989058/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
spark git commit: [SPARK-11572] Exit AsynchronousListenerBus thread when stop() is called
Repository: spark Updated Branches: refs/heads/master 33112f9c4 -> 3e0a6cf1e [SPARK-11572] Exit AsynchronousListenerBus thread when stop() is called As vonnagy reported in the following thread: http://search-hadoop.com/m/q3RTtk982kvIow22 Attempts to join the thread in AsynchronousListenerBus resulted in lock up because AsynchronousListenerBus thread was still getting messages `SparkListenerExecutorMetricsUpdate` from the DAGScheduler Author: tedyuCloses #9546 from ted-yu/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e0a6cf1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e0a6cf1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e0a6cf1 Branch: refs/heads/master Commit: 3e0a6cf1e02a19b37c68d3026415d53bb57a576b Parents: 33112f9 Author: tedyu Authored: Tue Nov 10 16:51:25 2015 -0800 Committer: Andrew Or Committed: Tue Nov 10 16:51:25 2015 -0800 -- .../org/apache/spark/util/AsynchronousListenerBus.scala | 9 +++-- 1 file changed, 3 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3e0a6cf1/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala index 61b5a4c..b8481ea 100644 --- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -67,15 +67,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri processingEvent = true } try { - val event = eventQueue.poll - if (event == null) { + if (stopped.get()) { // Get out of the while loop and shutdown the daemon thread -if (!stopped.get) { - throw new IllegalStateException("Polling `null` from eventQueue means" + -" the listener bus has been stopped. So `stopped` must be true") -} return } + val event = eventQueue.poll + assert(event != null, "event queue was empty but the listener bus was not stopped") postToAll(event) } finally { self.synchronized { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11572] Exit AsynchronousListenerBus thread when stop() is called
Repository: spark Updated Branches: refs/heads/branch-1.6 10272d5c9 -> 93ac30741 [SPARK-11572] Exit AsynchronousListenerBus thread when stop() is called As vonnagy reported in the following thread: http://search-hadoop.com/m/q3RTtk982kvIow22 Attempts to join the thread in AsynchronousListenerBus resulted in lock up because AsynchronousListenerBus thread was still getting messages `SparkListenerExecutorMetricsUpdate` from the DAGScheduler Author: tedyuCloses #9546 from ted-yu/master. (cherry picked from commit 3e0a6cf1e02a19b37c68d3026415d53bb57a576b) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93ac3074 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93ac3074 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93ac3074 Branch: refs/heads/branch-1.6 Commit: 93ac30741cd0ed99512f01525fbda8a08c87967a Parents: 10272d5 Author: tedyu Authored: Tue Nov 10 16:51:25 2015 -0800 Committer: Andrew Or Committed: Tue Nov 10 16:51:33 2015 -0800 -- .../org/apache/spark/util/AsynchronousListenerBus.scala | 9 +++-- 1 file changed, 3 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/93ac3074/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala index 61b5a4c..b8481ea 100644 --- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -67,15 +67,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri processingEvent = true } try { - val event = eventQueue.poll - if (event == null) { + if (stopped.get()) { // Get out of the while loop and shutdown the daemon thread -if (!stopped.get) { - throw new IllegalStateException("Polling `null` from eventQueue means" + -" the listener bus has been stopped. So `stopped` must be true") -} return } + val event = eventQueue.poll + assert(event != null, "event queue was empty but the listener bus was not stopped") postToAll(event) } finally { self.synchronized { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Fix typo in driver page
Repository: spark Updated Branches: refs/heads/branch-1.5 0512960fc -> 6e823b4d7 Fix typo in driver page "Comamnd property" => "Command property" Author: Paul ChandlerCloses #9578 from pestilence669/fix_spelling. (cherry picked from commit 5507a9d0935aa42d65c3a4fa65da680b5af14faf) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e823b4d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e823b4d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e823b4d Branch: refs/heads/branch-1.5 Commit: 6e823b4d7d52e9cf707f144256006e4575d23dc2 Parents: 0512960 Author: Paul Chandler Authored: Tue Nov 10 12:59:53 2015 +0100 Committer: Sean Owen Committed: Tue Nov 10 13:00:18 2015 +0100 -- .../main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6e823b4d/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala index e8ef60b..bc67fd4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala @@ -46,7 +46,7 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver") val schedulerHeaders = Seq("Scheduler property", "Value") val commandEnvHeaders = Seq("Command environment variable", "Value") val launchedHeaders = Seq("Launched property", "Value") -val commandHeaders = Seq("Comamnd property", "Value") +val commandHeaders = Seq("Command property", "Value") val retryHeaders = Seq("Last failed status", "Next retry time", "Retry count") val driverDescription = Iterable.apply(driverState.description) val submissionState = Iterable.apply(driverState.submissionState) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Fix typo in driver page
Repository: spark Updated Branches: refs/heads/branch-1.6 b426d24db -> 39c1ebcd3 Fix typo in driver page "Comamnd property" => "Command property" Author: Paul ChandlerCloses #9578 from pestilence669/fix_spelling. (cherry picked from commit 5507a9d0935aa42d65c3a4fa65da680b5af14faf) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39c1ebcd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39c1ebcd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39c1ebcd Branch: refs/heads/branch-1.6 Commit: 39c1ebcd30a163130d52dd8d4762ae4f9f1fde10 Parents: b426d24 Author: Paul Chandler Authored: Tue Nov 10 12:59:53 2015 +0100 Committer: Sean Owen Committed: Tue Nov 10 13:00:05 2015 +0100 -- .../main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/39c1ebcd/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala index e8ef60b..bc67fd4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala @@ -46,7 +46,7 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver") val schedulerHeaders = Seq("Scheduler property", "Value") val commandEnvHeaders = Seq("Command environment variable", "Value") val launchedHeaders = Seq("Launched property", "Value") -val commandHeaders = Seq("Comamnd property", "Value") +val commandHeaders = Seq("Command property", "Value") val retryHeaders = Seq("Last failed status", "Next retry time", "Retry count") val driverDescription = Iterable.apply(driverState.description) val submissionState = Iterable.apply(driverState.submissionState) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Fix typo in driver page
Repository: spark Updated Branches: refs/heads/master 521b3cae1 -> 5507a9d09 Fix typo in driver page "Comamnd property" => "Command property" Author: Paul ChandlerCloses #9578 from pestilence669/fix_spelling. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5507a9d0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5507a9d0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5507a9d0 Branch: refs/heads/master Commit: 5507a9d0935aa42d65c3a4fa65da680b5af14faf Parents: 521b3ca Author: Paul Chandler Authored: Tue Nov 10 12:59:53 2015 +0100 Committer: Sean Owen Committed: Tue Nov 10 12:59:53 2015 +0100 -- .../main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5507a9d0/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala index e8ef60b..bc67fd4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala @@ -46,7 +46,7 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver") val schedulerHeaders = Seq("Scheduler property", "Value") val commandEnvHeaders = Seq("Command environment variable", "Value") val launchedHeaders = Seq("Launched property", "Value") -val commandHeaders = Seq("Comamnd property", "Value") +val commandHeaders = Seq("Command property", "Value") val retryHeaders = Seq("Last failed status", "Next retry time", "Retry count") val driverDescription = Iterable.apply(driverState.description) val submissionState = Iterable.apply(driverState.submissionState) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/2] spark git commit: [SPARK-11290][STREAMING] Basic implementation of trackStateByKey
[SPARK-11290][STREAMING] Basic implementation of trackStateByKey Current updateStateByKey provides stateful processing in Spark Streaming. It allows the user to maintain per-key state and manage that state using an updateFunction. The updateFunction is called for each key, and it uses new data and existing state of the key, to generate an updated state. However, based on community feedback, we have learnt the following lessons. * Need for more optimized state management that does not scan every key * Need to make it easier to implement common use cases - (a) timeout of idle data, (b) returning items other than state The high level idea that of this PR * Introduce a new API trackStateByKey that, allows the user to update per-key state, and emit arbitrary records. The new API is necessary as this will have significantly different semantics than the existing updateStateByKey API. This API will have direct support for timeouts. * Internally, the system will keep the state data as a map/list within the partitions of the state RDDs. The new data RDDs will be partitioned appropriately, and for all the key-value data, it will lookup the map/list in the state RDD partition and create a new list/map of updated state data. The new state RDD partition will be created based on the update data and if necessary, with old data. Here is the detailed design doc. Please take a look and provide feedback as comments. https://docs.google.com/document/d/1NoALLyd83zGs1hNGMm0Pc5YOVgiPpMHugGMk6COqxxE/edit#heading=h.ph3w0clkd4em This is still WIP. Major things left to be done. - [x] Implement basic functionality of state tracking, with initial RDD and timeouts - [x] Unit tests for state tracking - [x] Unit tests for initial RDD and timeout - [ ] Unit tests for TrackStateRDD - [x] state creating, updating, removing - [ ] emitting - [ ] checkpointing - [x] Misc unit tests for State, TrackStateSpec, etc. - [x] Update docs and experimental tags Author: Tathagata DasCloses #9256 from tdas/trackStateByKey. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99f5f988 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99f5f988 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99f5f988 Branch: refs/heads/master Commit: 99f5f988612b3093d73d9ce98819767e822fcbff Parents: bd70244 Author: Tathagata Das Authored: Tue Nov 10 23:16:18 2015 -0800 Committer: Tathagata Das Committed: Tue Nov 10 23:16:18 2015 -0800 -- .../streaming/StatefulNetworkWordCount.scala| 25 +- .../org/apache/spark/streaming/State.scala | 193 .../org/apache/spark/streaming/StateSpec.scala | 212 .../dstream/PairDStreamFunctions.scala | 46 +- .../streaming/dstream/TrackStateDStream.scala | 142 ++ .../spark/streaming/rdd/TrackStateRDD.scala | 188 +++ .../apache/spark/streaming/util/StateMap.scala | 337 + .../apache/spark/streaming/StateMapSuite.scala | 314 .../spark/streaming/TrackStateByKeySuite.scala | 494 +++ .../streaming/rdd/TrackStateRDDSuite.scala | 193 10 files changed, 2125 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/99f5f988/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala index 02ba1c2..be2ae0b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala @@ -44,18 +44,6 @@ object StatefulNetworkWordCount { StreamingExamples.setStreamingLogLevels() -val updateFunc = (values: Seq[Int], state: Option[Int]) => { - val currentCount = values.sum - - val previousCount = state.getOrElse(0) - - Some(currentCount + previousCount) -} - -val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => { - iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) -} - val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) @@ -71,9 +59,16 @@ object StatefulNetworkWordCount { val wordDstream = words.map(x => (x, 1)) // Update the cumulative count using updateStateByKey -// This will give a Dstream made of state (which is
[1/2] spark git commit: [SPARK-11290][STREAMING] Basic implementation of trackStateByKey
Repository: spark Updated Branches: refs/heads/branch-1.6 85bc72908 -> daa74be6f http://git-wip-us.apache.org/repos/asf/spark/blob/daa74be6/streaming/src/test/scala/org/apache/spark/streaming/rdd/TrackStateRDDSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/TrackStateRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/TrackStateRDDSuite.scala new file mode 100644 index 000..fc5f266 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/TrackStateRDDSuite.scala @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.rdd + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.{Time, State} +import org.apache.spark.{HashPartitioner, SparkConf, SparkContext, SparkFunSuite} + +class TrackStateRDDSuite extends SparkFunSuite with BeforeAndAfterAll { + + private var sc = new SparkContext( +new SparkConf().setMaster("local").setAppName("TrackStateRDDSuite")) + + override def afterAll(): Unit = { +sc.stop() + } + + test("creation from pair RDD") { +val data = Seq((1, "1"), (2, "2"), (3, "3")) +val partitioner = new HashPartitioner(10) +val rdd = TrackStateRDD.createFromPairRDD[Int, Int, String, Int]( + sc.parallelize(data), partitioner, Time(123)) +assertRDD[Int, Int, String, Int](rdd, data.map { x => (x._1, x._2, 123)}.toSet, Set.empty) +assert(rdd.partitions.size === partitioner.numPartitions) + +assert(rdd.partitioner === Some(partitioner)) + } + + test("states generated by TrackStateRDD") { +val initStates = Seq(("k1", 0), ("k2", 0)) +val initTime = 123 +val initStateWthTime = initStates.map { x => (x._1, x._2, initTime) }.toSet +val partitioner = new HashPartitioner(2) +val initStateRDD = TrackStateRDD.createFromPairRDD[String, Int, Int, Int]( + sc.parallelize(initStates), partitioner, Time(initTime)).persist() +assertRDD(initStateRDD, initStateWthTime, Set.empty) + +val updateTime = 345 + +/** + * Test that the test state RDD, when operated with new data, + * creates a new state RDD with expected states + */ +def testStateUpdates( +testStateRDD: TrackStateRDD[String, Int, Int, Int], +testData: Seq[(String, Int)], +expectedStates: Set[(String, Int, Int)]): TrackStateRDD[String, Int, Int, Int] = { + + // Persist the test TrackStateRDD so that its not recomputed while doing the next operation. + // This is to make sure that we only track which state keys are being touched in the next op. + testStateRDD.persist().count() + + // To track which keys are being touched + TrackStateRDDSuite.touchedStateKeys.clear() + + val trackingFunc = (time: Time, key: String, data: Option[Int], state: State[Int]) => { + +// Track the key that has been touched +TrackStateRDDSuite.touchedStateKeys += key + +// If the data is 0, do not do anything with the state +// else if the data is 1, increment the state if it exists, or set new state to 0 +// else if the data is 2, remove the state if it exists +data match { + case Some(1) => +if (state.exists()) { state.update(state.get + 1) } +else state.update(0) + case Some(2) => +state.remove() + case _ => +} +None.asInstanceOf[Option[Int]] // Do not return anything, not being tested + } + val newDataRDD = sc.makeRDD(testData).partitionBy(testStateRDD.partitioner.get) + + // Assert that the new state RDD has expected state data + val newStateRDD = assertOperation( +testStateRDD, newDataRDD, trackingFunc, updateTime, expectedStates, Set.empty) + + // Assert that the function was called only for the keys present in the data + assert(TrackStateRDDSuite.touchedStateKeys.size === testData.size, +"More number of keys are being touched than that is expected") +
[1/2] spark git commit: [SPARK-11290][STREAMING] Basic implementation of trackStateByKey
Repository: spark Updated Branches: refs/heads/master bd70244b3 -> 99f5f9886 http://git-wip-us.apache.org/repos/asf/spark/blob/99f5f988/streaming/src/test/scala/org/apache/spark/streaming/rdd/TrackStateRDDSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/TrackStateRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/TrackStateRDDSuite.scala new file mode 100644 index 000..fc5f266 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/TrackStateRDDSuite.scala @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.rdd + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.{Time, State} +import org.apache.spark.{HashPartitioner, SparkConf, SparkContext, SparkFunSuite} + +class TrackStateRDDSuite extends SparkFunSuite with BeforeAndAfterAll { + + private var sc = new SparkContext( +new SparkConf().setMaster("local").setAppName("TrackStateRDDSuite")) + + override def afterAll(): Unit = { +sc.stop() + } + + test("creation from pair RDD") { +val data = Seq((1, "1"), (2, "2"), (3, "3")) +val partitioner = new HashPartitioner(10) +val rdd = TrackStateRDD.createFromPairRDD[Int, Int, String, Int]( + sc.parallelize(data), partitioner, Time(123)) +assertRDD[Int, Int, String, Int](rdd, data.map { x => (x._1, x._2, 123)}.toSet, Set.empty) +assert(rdd.partitions.size === partitioner.numPartitions) + +assert(rdd.partitioner === Some(partitioner)) + } + + test("states generated by TrackStateRDD") { +val initStates = Seq(("k1", 0), ("k2", 0)) +val initTime = 123 +val initStateWthTime = initStates.map { x => (x._1, x._2, initTime) }.toSet +val partitioner = new HashPartitioner(2) +val initStateRDD = TrackStateRDD.createFromPairRDD[String, Int, Int, Int]( + sc.parallelize(initStates), partitioner, Time(initTime)).persist() +assertRDD(initStateRDD, initStateWthTime, Set.empty) + +val updateTime = 345 + +/** + * Test that the test state RDD, when operated with new data, + * creates a new state RDD with expected states + */ +def testStateUpdates( +testStateRDD: TrackStateRDD[String, Int, Int, Int], +testData: Seq[(String, Int)], +expectedStates: Set[(String, Int, Int)]): TrackStateRDD[String, Int, Int, Int] = { + + // Persist the test TrackStateRDD so that its not recomputed while doing the next operation. + // This is to make sure that we only track which state keys are being touched in the next op. + testStateRDD.persist().count() + + // To track which keys are being touched + TrackStateRDDSuite.touchedStateKeys.clear() + + val trackingFunc = (time: Time, key: String, data: Option[Int], state: State[Int]) => { + +// Track the key that has been touched +TrackStateRDDSuite.touchedStateKeys += key + +// If the data is 0, do not do anything with the state +// else if the data is 1, increment the state if it exists, or set new state to 0 +// else if the data is 2, remove the state if it exists +data match { + case Some(1) => +if (state.exists()) { state.update(state.get + 1) } +else state.update(0) + case Some(2) => +state.remove() + case _ => +} +None.asInstanceOf[Option[Int]] // Do not return anything, not being tested + } + val newDataRDD = sc.makeRDD(testData).partitionBy(testStateRDD.partitioner.get) + + // Assert that the new state RDD has expected state data + val newStateRDD = assertOperation( +testStateRDD, newDataRDD, trackingFunc, updateTime, expectedStates, Set.empty) + + // Assert that the function was called only for the keys present in the data + assert(TrackStateRDDSuite.touchedStateKeys.size === testData.size, +"More number of keys are being touched than that is expected") +
[2/2] spark git commit: [SPARK-11290][STREAMING] Basic implementation of trackStateByKey
[SPARK-11290][STREAMING] Basic implementation of trackStateByKey Current updateStateByKey provides stateful processing in Spark Streaming. It allows the user to maintain per-key state and manage that state using an updateFunction. The updateFunction is called for each key, and it uses new data and existing state of the key, to generate an updated state. However, based on community feedback, we have learnt the following lessons. * Need for more optimized state management that does not scan every key * Need to make it easier to implement common use cases - (a) timeout of idle data, (b) returning items other than state The high level idea that of this PR * Introduce a new API trackStateByKey that, allows the user to update per-key state, and emit arbitrary records. The new API is necessary as this will have significantly different semantics than the existing updateStateByKey API. This API will have direct support for timeouts. * Internally, the system will keep the state data as a map/list within the partitions of the state RDDs. The new data RDDs will be partitioned appropriately, and for all the key-value data, it will lookup the map/list in the state RDD partition and create a new list/map of updated state data. The new state RDD partition will be created based on the update data and if necessary, with old data. Here is the detailed design doc. Please take a look and provide feedback as comments. https://docs.google.com/document/d/1NoALLyd83zGs1hNGMm0Pc5YOVgiPpMHugGMk6COqxxE/edit#heading=h.ph3w0clkd4em This is still WIP. Major things left to be done. - [x] Implement basic functionality of state tracking, with initial RDD and timeouts - [x] Unit tests for state tracking - [x] Unit tests for initial RDD and timeout - [ ] Unit tests for TrackStateRDD - [x] state creating, updating, removing - [ ] emitting - [ ] checkpointing - [x] Misc unit tests for State, TrackStateSpec, etc. - [x] Update docs and experimental tags Author: Tathagata DasCloses #9256 from tdas/trackStateByKey. (cherry picked from commit 99f5f988612b3093d73d9ce98819767e822fcbff) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/daa74be6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/daa74be6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/daa74be6 Branch: refs/heads/branch-1.6 Commit: daa74be6f863061221bb0c2f94e70672e6fcbeaa Parents: 85bc729 Author: Tathagata Das Authored: Tue Nov 10 23:16:18 2015 -0800 Committer: Tathagata Das Committed: Tue Nov 10 23:16:37 2015 -0800 -- .../streaming/StatefulNetworkWordCount.scala| 25 +- .../org/apache/spark/streaming/State.scala | 193 .../org/apache/spark/streaming/StateSpec.scala | 212 .../dstream/PairDStreamFunctions.scala | 46 +- .../streaming/dstream/TrackStateDStream.scala | 142 ++ .../spark/streaming/rdd/TrackStateRDD.scala | 188 +++ .../apache/spark/streaming/util/StateMap.scala | 337 + .../apache/spark/streaming/StateMapSuite.scala | 314 .../spark/streaming/TrackStateByKeySuite.scala | 494 +++ .../streaming/rdd/TrackStateRDDSuite.scala | 193 10 files changed, 2125 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/daa74be6/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala index 02ba1c2..be2ae0b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala @@ -44,18 +44,6 @@ object StatefulNetworkWordCount { StreamingExamples.setStreamingLogLevels() -val updateFunc = (values: Seq[Int], state: Option[Int]) => { - val currentCount = values.sum - - val previousCount = state.getOrElse(0) - - Some(currentCount + previousCount) -} - -val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => { - iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) -} - val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) @@ -71,9 +59,16 @@ object StatefulNetworkWordCount { val wordDstream =
spark git commit: [SPARK-11615] Drop @VisibleForTesting annotation
Repository: spark Updated Branches: refs/heads/master 3e0a6cf1e -> 900917541 [SPARK-11615] Drop @VisibleForTesting annotation See http://search-hadoop.com/m/q3RTtjpe8r1iRbTj2 for discussion. Summary: addition of VisibleForTesting annotation resulted in spark-shell malfunctioning. Author: tedyuCloses #9585 from tedyu/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90091754 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90091754 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90091754 Branch: refs/heads/master Commit: 900917541651abe7125f0d205085d2ab6a00d92c Parents: 3e0a6cf Author: tedyu Authored: Tue Nov 10 16:52:26 2015 -0800 Committer: Andrew Or Committed: Tue Nov 10 16:52:59 2015 -0800 -- core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala | 8 .../scala/org/apache/spark/ui/jobs/JobProgressListener.scala | 2 -- .../org/apache/spark/util/AsynchronousListenerBus.scala | 5 ++--- .../org/apache/spark/util/collection/ExternalSorter.scala| 3 +-- scalastyle-config.xml| 7 +++ .../org/apache/spark/sql/execution/QueryExecution.scala | 3 --- .../apache/spark/network/shuffle/ShuffleTestAccessor.scala | 1 - 7 files changed, 14 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/90091754/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala -- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala index c72b588..464027f 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala @@ -21,8 +21,6 @@ import javax.annotation.concurrent.GuardedBy import scala.util.control.NonFatal -import com.google.common.annotations.VisibleForTesting - import org.apache.spark.{Logging, SparkException} import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, ThreadSafeRpcEndpoint} @@ -193,8 +191,10 @@ private[netty] class Inbox( def isEmpty: Boolean = inbox.synchronized { messages.isEmpty } - /** Called when we are dropping a message. Test cases override this to test message dropping. */ - @VisibleForTesting + /** + * Called when we are dropping a message. Test cases override this to test message dropping. + * Exposed for testing. + */ protected def onDrop(message: InboxMessage): Unit = { logWarning(s"Drop $message because $endpointRef is stopped") } http://git-wip-us.apache.org/repos/asf/spark/blob/90091754/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 77d034f..ca37829 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -21,8 +21,6 @@ import java.util.concurrent.TimeoutException import scala.collection.mutable.{HashMap, HashSet, ListBuffer} -import com.google.common.annotations.VisibleForTesting - import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics http://git-wip-us.apache.org/repos/asf/spark/blob/90091754/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala index b8481ea..b3b54af 100644 --- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -20,7 +20,6 @@ package org.apache.spark.util import java.util.concurrent._ import java.util.concurrent.atomic.AtomicBoolean -import com.google.common.annotations.VisibleForTesting import org.apache.spark.SparkContext /** @@ -119,8 +118,8 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri * For testing only. Wait until there are no more events in the queue, or until the specified * time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue * emptied. + * Exposed for testing. */ - @VisibleForTesting @throws(classOf[TimeoutException]) def waitUntilEmpty(timeoutMillis: Long): Unit = { val finishTime = System.currentTimeMillis +
spark git commit: [MINOR] License header formatting fix
Repository: spark Updated Branches: refs/heads/master 12c7635dc -> 745e45d5f [MINOR] License header formatting fix The header wasn't indented properly. Author: Marc Prud'hommeauxCloses #9312 from mprudhom/patch-1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/745e45d5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/745e45d5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/745e45d5 Branch: refs/heads/master Commit: 745e45d5ff7fe251c0d5197b7e08b1f80807b005 Parents: 12c7635 Author: Marc Prud'hommeaux Authored: Tue Nov 10 16:57:08 2015 -0800 Committer: Andrew Or Committed: Tue Nov 10 16:57:12 2015 -0800 -- .../datasources/jdbc/DefaultSource.scala| 30 ++-- 1 file changed, 15 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/745e45d5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala index 6773afc..f522303 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -*http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.sql.execution.datasources.jdbc - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10192][HOTFIX] Fix NPE in test that was added in #8402
Repository: spark Updated Branches: refs/heads/master 6e101d2e9 -> fac53d8ec [SPARK-10192][HOTFIX] Fix NPE in test that was added in #8402 This fixes an NPE introduced in SPARK-10192 / #8402. Author: Josh RosenCloses #9620 from JoshRosen/SPARK-10192-hotfix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fac53d8e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fac53d8e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fac53d8e Branch: refs/heads/master Commit: fac53d8ec015e27d034dfe30ed8ce7d83f07efa6 Parents: 6e101d2 Author: Josh Rosen Authored: Tue Nov 10 22:24:00 2015 -0800 Committer: Josh Rosen Committed: Tue Nov 10 22:24:00 2015 -0800 -- .../test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fac53d8e/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 068b49b..4d6b254 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1070,7 +1070,7 @@ class DAGSchedulerSuite // Run the first job successfully, which creates one shuffle dependency val shuffleMapRdd = new MyRDD(sc, 2, Nil) -val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) submit(reduceRdd, Array(0, 1)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10192][HOTFIX] Fix NPE in test that was added in #8402
Repository: spark Updated Branches: refs/heads/branch-1.6 bc3bfa04d -> d9a39048f [SPARK-10192][HOTFIX] Fix NPE in test that was added in #8402 This fixes an NPE introduced in SPARK-10192 / #8402. Author: Josh RosenCloses #9620 from JoshRosen/SPARK-10192-hotfix. (cherry picked from commit fac53d8ec015e27d034dfe30ed8ce7d83f07efa6) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9a39048 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9a39048 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9a39048 Branch: refs/heads/branch-1.6 Commit: d9a39048f9d2ade54828a4b629cebcebd9f9350d Parents: bc3bfa0 Author: Josh Rosen Authored: Tue Nov 10 22:24:00 2015 -0800 Committer: Josh Rosen Committed: Tue Nov 10 22:24:23 2015 -0800 -- .../test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d9a39048/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 068b49b..4d6b254 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1070,7 +1070,7 @@ class DAGSchedulerSuite // Run the first job successfully, which creates one shuffle dependency val shuffleMapRdd = new MyRDD(sc, 2, Nil) -val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) submit(reduceRdd, Array(0, 1)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11468] [SPARKR] add stddev/variance agg functions for Column
Repository: spark Updated Branches: refs/heads/branch-1.6 d9a39048f -> 649a0a357 [SPARK-11468] [SPARKR] add stddev/variance agg functions for Column Checked names, none of them should conflict with anything in base shivaram davies rxin Author: felixcheungCloses #9489 from felixcheung/rstddev. (cherry picked from commit 1a8e0468a1c07e99ad395eb0e4dc072c5cf7393a) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/649a0a35 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/649a0a35 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/649a0a35 Branch: refs/heads/branch-1.6 Commit: 649a0a3570424afea85e528d3ad691ba923f3a25 Parents: d9a3904 Author: felixcheung Authored: Tue Nov 10 22:45:17 2015 -0800 Committer: Davies Liu Committed: Tue Nov 10 22:45:50 2015 -0800 -- R/pkg/NAMESPACE | 10 ++ R/pkg/R/functions.R | 186 +++--- R/pkg/R/generics.R | 40 R/pkg/R/group.R | 8 +- R/pkg/inst/tests/test_sparkSQL.R | 83 --- 5 files changed, 297 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/649a0a35/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 52fd6c9..2ee7d6f 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -155,6 +155,7 @@ exportMethods("%in%", "isNaN", "isNotNull", "isNull", + "kurtosis", "lag", "last", "last_day", @@ -207,12 +208,17 @@ exportMethods("%in%", "shiftLeft", "shiftRight", "shiftRightUnsigned", + "sd", "sign", "signum", "sin", "sinh", "size", + "skewness", "soundex", + "stddev", + "stddev_pop", + "stddev_samp", "sqrt", "startsWith", "substr", @@ -231,6 +237,10 @@ exportMethods("%in%", "unhex", "unix_timestamp", "upper", + "var", + "variance", + "var_pop", + "var_samp", "weekofyear", "when", "year") http://git-wip-us.apache.org/repos/asf/spark/blob/649a0a35/R/pkg/R/functions.R -- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 0b28087..3d0255a 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -520,6 +520,22 @@ setMethod("isNaN", column(jc) }) +#' kurtosis +#' +#' Aggregate function: returns the kurtosis of the values in a group. +#' +#' @rdname kurtosis +#' @name kurtosis +#' @family agg_funcs +#' @export +#' @examples \dontrun{kurtosis(df$c)} +setMethod("kurtosis", + signature(x = "Column"), + function(x) { +jc <- callJStatic("org.apache.spark.sql.functions", "kurtosis", x@jc) +column(jc) + }) + #' last #' #' Aggregate function: returns the last value in a group. @@ -861,6 +877,28 @@ setMethod("rtrim", column(jc) }) +#' sd +#' +#' Aggregate function: alias for \link{stddev_samp} +#' +#' @rdname sd +#' @name sd +#' @family agg_funcs +#' @seealso \link{stddev_pop}, \link{stddev_samp} +#' @export +#' @examples +#'\dontrun{ +#'stddev(df$c) +#'select(df, stddev(df$age)) +#'agg(df, sd(df$age)) +#'} +setMethod("sd", + signature(x = "Column"), + function(x, na.rm = FALSE) { +# In R, sample standard deviation is calculated with the sd() function. +stddev_samp(x) + }) + #' second #' #' Extracts the seconds as an integer from a given date/timestamp/string. @@ -958,6 +996,22 @@ setMethod("size", column(jc) }) +#' skewness +#' +#' Aggregate function: returns the skewness of the values in a group. +#' +#' @rdname skewness +#' @name skewness +#' @family agg_funcs +#' @export +#' @examples \dontrun{skewness(df$c)} +setMethod("skewness", + signature(x = "Column"), + function(x) { +jc <- callJStatic("org.apache.spark.sql.functions", "skewness", x@jc) +column(jc) + }) + #' soundex #' #' Return the soundex code for the specified expression. @@ -974,6 +1028,49 @@ setMethod("soundex", column(jc) }) +#' @rdname sd +#' @name stddev +setMethod("stddev", + signature(x = "Column"), +
spark git commit: [SPARK-11468] [SPARKR] add stddev/variance agg functions for Column
Repository: spark Updated Branches: refs/heads/master fac53d8ec -> 1a8e0468a [SPARK-11468] [SPARKR] add stddev/variance agg functions for Column Checked names, none of them should conflict with anything in base shivaram davies rxin Author: felixcheungCloses #9489 from felixcheung/rstddev. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a8e0468 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a8e0468 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a8e0468 Branch: refs/heads/master Commit: 1a8e0468a1c07e99ad395eb0e4dc072c5cf7393a Parents: fac53d8 Author: felixcheung Authored: Tue Nov 10 22:45:17 2015 -0800 Committer: Davies Liu Committed: Tue Nov 10 22:45:17 2015 -0800 -- R/pkg/NAMESPACE | 10 ++ R/pkg/R/functions.R | 186 +++--- R/pkg/R/generics.R | 40 R/pkg/R/group.R | 8 +- R/pkg/inst/tests/test_sparkSQL.R | 83 --- 5 files changed, 297 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1a8e0468/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 52fd6c9..2ee7d6f 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -155,6 +155,7 @@ exportMethods("%in%", "isNaN", "isNotNull", "isNull", + "kurtosis", "lag", "last", "last_day", @@ -207,12 +208,17 @@ exportMethods("%in%", "shiftLeft", "shiftRight", "shiftRightUnsigned", + "sd", "sign", "signum", "sin", "sinh", "size", + "skewness", "soundex", + "stddev", + "stddev_pop", + "stddev_samp", "sqrt", "startsWith", "substr", @@ -231,6 +237,10 @@ exportMethods("%in%", "unhex", "unix_timestamp", "upper", + "var", + "variance", + "var_pop", + "var_samp", "weekofyear", "when", "year") http://git-wip-us.apache.org/repos/asf/spark/blob/1a8e0468/R/pkg/R/functions.R -- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 0b28087..3d0255a 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -520,6 +520,22 @@ setMethod("isNaN", column(jc) }) +#' kurtosis +#' +#' Aggregate function: returns the kurtosis of the values in a group. +#' +#' @rdname kurtosis +#' @name kurtosis +#' @family agg_funcs +#' @export +#' @examples \dontrun{kurtosis(df$c)} +setMethod("kurtosis", + signature(x = "Column"), + function(x) { +jc <- callJStatic("org.apache.spark.sql.functions", "kurtosis", x@jc) +column(jc) + }) + #' last #' #' Aggregate function: returns the last value in a group. @@ -861,6 +877,28 @@ setMethod("rtrim", column(jc) }) +#' sd +#' +#' Aggregate function: alias for \link{stddev_samp} +#' +#' @rdname sd +#' @name sd +#' @family agg_funcs +#' @seealso \link{stddev_pop}, \link{stddev_samp} +#' @export +#' @examples +#'\dontrun{ +#'stddev(df$c) +#'select(df, stddev(df$age)) +#'agg(df, sd(df$age)) +#'} +setMethod("sd", + signature(x = "Column"), + function(x, na.rm = FALSE) { +# In R, sample standard deviation is calculated with the sd() function. +stddev_samp(x) + }) + #' second #' #' Extracts the seconds as an integer from a given date/timestamp/string. @@ -958,6 +996,22 @@ setMethod("size", column(jc) }) +#' skewness +#' +#' Aggregate function: returns the skewness of the values in a group. +#' +#' @rdname skewness +#' @name skewness +#' @family agg_funcs +#' @export +#' @examples \dontrun{skewness(df$c)} +setMethod("skewness", + signature(x = "Column"), + function(x) { +jc <- callJStatic("org.apache.spark.sql.functions", "skewness", x@jc) +column(jc) + }) + #' soundex #' #' Return the soundex code for the specified expression. @@ -974,6 +1028,49 @@ setMethod("soundex", column(jc) }) +#' @rdname sd +#' @name stddev +setMethod("stddev", + signature(x = "Column"), + function(x) { +jc <- callJStatic("org.apache.spark.sql.functions", "stddev", x@jc) +column(jc) +
spark git commit: [SPARK-11463] [PYSPARK] only install signal in main thread
Repository: spark Updated Branches: refs/heads/branch-1.6 649a0a357 -> 85bc72908 [SPARK-11463] [PYSPARK] only install signal in main thread Only install signal in main thread, or it will fail to create context in not-main thread. Author: Davies LiuCloses #9574 from davies/python_signal. (cherry picked from commit bd70244b3cda62cc447fd4cc343d4eb5ddaec893) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/85bc7290 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/85bc7290 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/85bc7290 Branch: refs/heads/branch-1.6 Commit: 85bc7290881bee4f02833f0802571ef589e96856 Parents: 649a0a3 Author: Davies Liu Authored: Tue Nov 10 22:46:17 2015 -0800 Committer: Davies Liu Committed: Tue Nov 10 22:46:28 2015 -0800 -- python/pyspark/context.py | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/85bc7290/python/pyspark/context.py -- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index afd74d9..77710a1 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -21,6 +21,7 @@ import os import shutil import signal import sys +import threading from threading import RLock from tempfile import NamedTemporaryFile @@ -222,7 +223,9 @@ class SparkContext(object): def signal_handler(signal, frame): self.cancelAllJobs() -signal.signal(signal.SIGINT, signal_handler) +# see http://stackoverflow.com/questions/23206787/ +if isinstance(threading.current_thread(), threading._MainThread): +signal.signal(signal.SIGINT, signal_handler) def _initialize_context(self, jconf): """ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11463] [PYSPARK] only install signal in main thread
Repository: spark Updated Branches: refs/heads/master 1a8e0468a -> bd70244b3 [SPARK-11463] [PYSPARK] only install signal in main thread Only install signal in main thread, or it will fail to create context in not-main thread. Author: Davies LiuCloses #9574 from davies/python_signal. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd70244b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd70244b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd70244b Branch: refs/heads/master Commit: bd70244b3cda62cc447fd4cc343d4eb5ddaec893 Parents: 1a8e046 Author: Davies Liu Authored: Tue Nov 10 22:46:17 2015 -0800 Committer: Davies Liu Committed: Tue Nov 10 22:46:17 2015 -0800 -- python/pyspark/context.py | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bd70244b/python/pyspark/context.py -- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index afd74d9..77710a1 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -21,6 +21,7 @@ import os import shutil import signal import sys +import threading from threading import RLock from tempfile import NamedTemporaryFile @@ -222,7 +223,9 @@ class SparkContext(object): def signal_handler(signal, frame): self.cancelAllJobs() -signal.signal(signal.SIGINT, signal_handler) +# see http://stackoverflow.com/questions/23206787/ +if isinstance(threading.current_thread(), threading._MainThread): +signal.signal(signal.SIGINT, signal_handler) def _initialize_context(self, jconf): """ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.5.2-rc2 [deleted] 49c30c1f6 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org