[spark] branch master updated: [SPARK-26525][SHUFFLE] Fast release ShuffleBlockFetcherIterator on completion of the iteration

2019-01-31 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new aea5f50  [SPARK-26525][SHUFFLE] Fast release 
ShuffleBlockFetcherIterator on completion of the iteration
aea5f50 is described below

commit aea5f506463c19fac97547ba7a28f9dd491e3a6a
Author: Liupengcheng 
AuthorDate: Fri Feb 1 13:47:14 2019 +0800

[SPARK-26525][SHUFFLE] Fast release ShuffleBlockFetcherIterator on 
completion of the iteration

## What changes were proposed in this pull request?

Currently, spark would not release ShuffleBlockFetcherIterator until the 
whole task finished.In some conditions, it incurs memory leak.

An example is `rdd.repartition(m).coalesce(n, shuffle = false).save`, each 
`ShuffleBlockFetcherIterator` contains  some metas about 
mapStatus(`blocksByAddress`) and each resultTask will keep n(max to shuffle 
partitions) shuffleBlockFetcherIterator and the memory would never released 
until the task completion, for they are referenced by the completion callbacks 
of TaskContext. In some case, it may take huge memory and incurs OOM.

Actually, We can release ShuffleBlockFetcherIterator as soon as it's 
consumed.
This PR is to resolve this problem.

## How was this patch tested?

unittest

Please review http://spark.apache.org/contributing.html before opening a 
pull request.

Closes #23438 from liupc/Fast-release-shuffleblockfetcheriterator.

Lead-authored-by: Liupengcheng 
Co-authored-by: liupengcheng 
Signed-off-by: Wenchen Fan 
---
 .../spark/shuffle/BlockStoreShuffleReader.scala|  2 +-
 .../storage/ShuffleBlockFetcherIterator.scala  | 34 --
 2 files changed, 32 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala 
b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
index daafe30..c5eefc7 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
@@ -55,7 +55,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
   SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
   SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
   SparkEnv.get.conf.get(config.SHUFFLE_DETECT_CORRUPT),
-  readMetrics)
+  readMetrics).toCompletionIterator
 
 val serializerInstance = dep.serializer.newInstance()
 
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index f73c21b..3966980 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -31,7 +31,7 @@ import 
org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
 import org.apache.spark.network.shuffle._
 import org.apache.spark.network.util.TransportConf
 import org.apache.spark.shuffle.{FetchFailedException, 
ShuffleReadMetricsReporter}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{CompletionIterator, TaskCompletionListener, 
Utils}
 import org.apache.spark.util.io.ChunkedByteBufferOutputStream
 
 /**
@@ -160,6 +160,8 @@ final class ShuffleBlockFetcherIterator(
   @GuardedBy("this")
   private[this] val shuffleFilesSet = mutable.HashSet[DownloadFile]()
 
+  private[this] val onCompleteCallback = new 
ShuffleFetchCompletionListener(this)
+
   initialize()
 
   // Decrements the buffer reference count.
@@ -192,7 +194,7 @@ final class ShuffleBlockFetcherIterator(
   /**
* Mark the iterator as zombie, and release all buffers that haven't been 
deserialized yet.
*/
-  private[this] def cleanup() {
+  private[storage] def cleanup() {
 synchronized {
   isZombie = true
 }
@@ -364,7 +366,7 @@ final class ShuffleBlockFetcherIterator(
 
   private[this] def initialize(): Unit = {
 // Add a task completion callback (called in both success case and failure 
case) to cleanup.
-context.addTaskCompletionListener[Unit](_ => cleanup())
+context.addTaskCompletionListener(onCompleteCallback)
 
 // Split local and remote blocks.
 val remoteRequests = splitLocalRemoteBlocks()
@@ -509,6 +511,11 @@ final class ShuffleBlockFetcherIterator(
 (currentResult.blockId, new BufferReleasingInputStream(input, this))
   }
 
+  def toCompletionIterator: Iterator[(BlockId, InputStream)] = {
+CompletionIterator[(BlockId, InputStream), this.type](this,
+  onCompleteCallback.onComplete(context))
+  }
+
   private def fetchUpToMaxBytes(): Unit = {
 // Send fetch requests up to maxBytesInFlight. If you cannot fetch from a 
remote 

[spark] branch master updated: [SPARK-26730][SQL] Strip redundant AssertNotNull for ExpressionEncoder's serializer

2019-01-31 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8f968b4  [SPARK-26730][SQL] Strip redundant AssertNotNull for 
ExpressionEncoder's serializer
8f968b4 is described below

commit 8f968b4c064558d6a4c38e7e2592e49d9507
Author: wuyi 
AuthorDate: Fri Feb 1 10:48:37 2019 +0800

[SPARK-26730][SQL] Strip redundant AssertNotNull for ExpressionEncoder's 
serializer

## What changes were proposed in this pull request?

For types like Product, we've already add AssertNotNull when we construct 
serializer(see code below), so we could strip redundant AssertNotNull for those 
types.

```
val fieldValue = Invoke(
AssertNotNull(inputObject, walkedTypePath), fieldName, 
dataTypeFor(fieldType),
returnNullable = !fieldType.typeSymbol.asClass.isPrimitive)
```
## How was this patch tested?

Existed.

Closes #23651 from 
Ngone51/dev-strip-redundant-assertnotnull-for-ecnoder-serializer.

Authored-by: wuyi 
Signed-off-by: Wenchen Fan 
---
 .../main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala | 7 +--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 1b06835..d5af91a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -631,8 +631,11 @@ object ScalaReflection extends ScalaReflection {
   "cannot be used as field name\n" + walkedTypePath.mkString("\n"))
   }
 
-  val fieldValue = Invoke(
-AssertNotNull(inputObject, walkedTypePath), fieldName, 
dataTypeFor(fieldType),
+  // SPARK-26730 inputObject won't be null with If's guard below. And 
KnownNotNul
+  // is necessary here. Because for a nullable nested inputObject with 
struct data
+  // type, e.g. StructType(IntegerType, StringType), it will return 
nullable=true
+  // for IntegerType without KnownNotNull. And that's what we do not 
expect to.
+  val fieldValue = Invoke(KnownNotNull(inputObject), fieldName, 
dataTypeFor(fieldType),
 returnNullable = !fieldType.typeSymbol.asClass.isPrimitive)
   val clsName = getClassNameFromType(fieldType)
   val newPath = s"""- field (class: "$clsName", name: "$fieldName")""" 
+: walkedTypePath


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-26745][SPARK-24959][SQL][BRANCH-2.4] Revert count optimization in JSON datasource by

2019-01-31 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 2a83431  [SPARK-26745][SPARK-24959][SQL][BRANCH-2.4] Revert count 
optimization in JSON datasource by
2a83431 is described below

commit 2a8343121e62aabe5c69d1e20fbb2c01e2e520e7
Author: Hyukjin Kwon 
AuthorDate: Fri Feb 1 10:22:05 2019 +0800

[SPARK-26745][SPARK-24959][SQL][BRANCH-2.4] Revert count optimization in 
JSON datasource by

## What changes were proposed in this pull request?

This PR reverts JSON count optimization part of #21909.

We cannot distinguish the cases below without parsing:

```
[{...}, {...}]
```

```
[]
```

```
{...}
```

```bash
# empty string
```

when we `count()`. One line (input: IN) can be, 0 record, 1 record and 
multiple records and this is dependent on each input.

See also https://github.com/apache/spark/pull/23665#discussion_r251276720.

## How was this patch tested?

Manually tested.

Closes #23708 from HyukjinKwon/SPARK-26745-backport.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 .../scala/org/apache/spark/sql/DataFrameReader.scala |  6 ++
 .../sql/execution/datasources/FailureSafeParser.scala| 11 ++-
 .../sql/execution/datasources/csv/UnivocityParser.scala  | 16 +++-
 .../sql/execution/datasources/json/JsonDataSource.scala  |  6 ++
 .../sql/execution/datasources/json/JsonBenchmarks.scala  |  4 
 5 files changed, 17 insertions(+), 26 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 869c584..e9278a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -450,8 +450,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
 input => rawParser.parse(input, createParser, UTF8String.fromString),
 parsedOptions.parseMode,
 schema,
-parsedOptions.columnNameOfCorruptRecord,
-parsedOptions.multiLine)
+parsedOptions.columnNameOfCorruptRecord)
   iter.flatMap(parser.parse)
 }
 sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = 
jsonDataset.isStreaming)
@@ -526,8 +525,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
 input => Seq(rawParser.parse(input)),
 parsedOptions.parseMode,
 schema,
-parsedOptions.columnNameOfCorruptRecord,
-parsedOptions.multiLine)
+parsedOptions.columnNameOfCorruptRecord)
   iter.flatMap(parser.parse)
 }
 sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = 
csvDataset.isStreaming)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
index 90e8166..e618f17 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
@@ -29,8 +29,7 @@ class FailureSafeParser[IN](
 rawParser: IN => Seq[InternalRow],
 mode: ParseMode,
 schema: StructType,
-columnNameOfCorruptRecord: String,
-isMultiLine: Boolean) {
+columnNameOfCorruptRecord: String) {
 
   private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
   private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
@@ -58,15 +57,9 @@ class FailureSafeParser[IN](
 }
   }
 
-  private val skipParsing = !isMultiLine && mode == PermissiveMode && 
schema.isEmpty
-
   def parse(input: IN): Iterator[InternalRow] = {
 try {
- if (skipParsing) {
-   Iterator.single(InternalRow.empty)
- } else {
-   rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () 
=> null))
- }
+  rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () 
=> null))
 } catch {
   case e: BadRecordException => mode match {
 case PermissiveMode =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 9088d43..42e3964 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -203,11 +203,19 @@ class UnivocityParser(
 }
   }
 

[spark] branch master updated: [SPARK-7721][INFRA] Run and generate test coverage report from Python via Jenkins

2019-01-31 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new cdd694c  [SPARK-7721][INFRA] Run and generate test coverage report 
from Python via Jenkins
cdd694c is described below

commit cdd694c52b53165acba6faabaf3a1fbaa925ac2e
Author: Hyukjin Kwon 
AuthorDate: Fri Feb 1 10:18:08 2019 +0800

[SPARK-7721][INFRA] Run and generate test coverage report from Python via 
Jenkins

## What changes were proposed in this pull request?

### Background

For the current status, the test script that generates coverage information 
was merged
into Spark, https://github.com/apache/spark/pull/20204

So, we can generate the coverage report and site by, for example:

```
run-tests-with-coverage --python-executables=python3 --modules=pyspark-sql
```

like `run-tests` script in `./python`.

### Proposed change

The next step is to host this coverage report via `github.io` automatically
by Jenkins (see https://spark-test.github.io/pyspark-coverage-site/).

This uses my testing account for Spark, spark-test, which is shared to 
Felix and Shivaram a long time ago for testing purpose including AppVeyor.

To cut this short, this PR targets to run the coverage in

[spark-master-test-sbt-hadoop-2.7](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/)

In the specific job, it will clone the page, and rebase the up-to-date 
PySpark test coverage from the latest commit. For instance as below:

```bash
# Clone PySpark coverage site.
git clone https://github.com/spark-test/pyspark-coverage-site.git

# Remove existing HTMLs.
rm -fr pyspark-coverage-site/*

# Copy generated coverage HTMLs.
cp -r .../python/test_coverage/htmlcov/* pyspark-coverage-site/

# Check out to a temporary branch.
git symbolic-ref HEAD refs/heads/latest_branch

# Add all the files.
git add -A

# Commit current HTMLs.
git commit -am "Coverage report at latest commit in Apache Spark"

# Delete the old branch.
git branch -D gh-pages

# Rename the temporary branch to master.
git branch -m gh-pages

# Finally, force update to our repository.
git push -f origin gh-pages
```

So, it is a one single up-to-date coverage can be shown in the `github-io` 
page. The commands above were manually tested.

### TODOs

- [x] Write a draft HyukjinKwon
- [x] `pip install coverage` to all python implementations (pypy, python2, 
python3) in Jenkins workers  - shaneknapp
- [x] Set hidden `SPARK_TEST_KEY` for spark-test's password in Jenkins via 
Jenkins's feature
 This should be set in both PR builder and 
`spark-master-test-sbt-hadoop-2.7` so that later other PRs can test and fix the 
bugs - shaneknapp
- [x] Set an environment variable that indicates 
`spark-master-test-sbt-hadoop-2.7` so that that specific build can report and 
update the coverage site - shaneknapp
- [x] Make PR builder's test passed HyukjinKwon
- [x] Fix flaky test related with coverage HyukjinKwon
  -  6 consecutive passes out of 7 runs

This PR will be co-authored with me and shaneknapp

## How was this patch tested?

It will be tested via Jenkins.

Closes #23117 from HyukjinKwon/SPARK-7721.

Lead-authored-by: Hyukjin Kwon 
Co-authored-by: hyukjinkwon 
Co-authored-by: shane knapp 
Signed-off-by: Hyukjin Kwon 
---
 README.md  |  1 +
 dev/run-tests.py   | 63 --
 python/pyspark/streaming/tests/test_dstream.py | 10 
 3 files changed, 71 insertions(+), 3 deletions(-)

diff --git a/README.md b/README.md
index f3b90ce..271f2f5 100644
--- a/README.md
+++ b/README.md
@@ -2,6 +2,7 @@
 
 [![Jenkins 
Build](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/badge/icon)](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7)
 [![AppVeyor 
Build](https://img.shields.io/appveyor/ci/ApacheSoftwareFoundation/spark/master.svg?style=plastic=appveyor)](https://ci.appveyor.com/project/ApacheSoftwareFoundation/spark)
+[![PySpark 
Coverage](https://img.shields.io/badge/dynamic/xml.svg?label=pyspark%20coverage=https%3A%2F%2Fspark-test.github.io%2Fpyspark-coverage-site=%2Fhtml%2Fbody%2Fdiv%5B1%5D%2Fdiv%2Fh1%2Fspan=brightgreen=plastic)](https://spark-test.github.io/pyspark-coverage-site)
 
 Spark is a fast and general cluster computing system for Big Data. It provides
 high-level APIs in Scala, Java, Python, and R, and an optimized engine that
diff --git a/dev/run-tests.py b/dev/run-tests.py
index e1ed274..edd89c9 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py

[spark] branch master updated: [SPARK-26787] Fix standardizeLabels error message in WeightedLeastSquares

2019-01-31 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e44f308  [SPARK-26787] Fix standardizeLabels error message in 
WeightedLeastSquares
e44f308 is described below

commit e44f308593e8cb02cdaeb5533f387c465aa60c6c
Author: bscan 
AuthorDate: Thu Jan 31 19:50:18 2019 -0600

[SPARK-26787] Fix standardizeLabels error message in WeightedLeastSquares

Error message falsely states standardization=True is causing a problem, 
even when standardization=False. The real issue is standardizeLabels=True, 
which is set automatically in LinearRegression and not currently available in 
the Public API.

## What changes were proposed in this pull request?

A simple change to an error message. More details here: 
https://jira.apache.org/jira/browse/SPARK-26787

## How was this patch tested?

This does not change any functionality.

Closes #23705 from bscan/bscan-errormsg-1.

Authored-by: bscan 
Signed-off-by: Sean Owen 
---
 .../src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala 
b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala
index 134d6a9..9f32603 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala
@@ -133,7 +133,7 @@ private[ml] class WeightedLeastSquares(
 return new WeightedLeastSquaresModel(coefficients, intercept, 
diagInvAtWA, Array(0D))
   } else {
 require(!(regParam > 0.0 && standardizeLabel), "The standard deviation 
of the label is " +
-  "zero. Model cannot be regularized with standardization=true")
+  "zero. Model cannot be regularized when labels are standardized.")
 instr.logWarning(s"The standard deviation of the label is zero. 
Consider setting " +
   s"fitIntercept=true.")
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-25997][ML] add Python example code for Power Iteration Clustering in spark.ml

2019-01-31 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f7d87b1  [SPARK-25997][ML] add Python example code for Power Iteration 
Clustering in spark.ml
f7d87b1 is described below

commit f7d87b1685eeac3a2c8b903ddb86e1921fcc193c
Author: Huaxin Gao 
AuthorDate: Thu Jan 31 19:33:44 2019 -0600

[SPARK-25997][ML] add Python example code for Power Iteration Clustering in 
spark.ml

## What changes were proposed in this pull request?

Add python example for Power Iteration Clustering in spark.ml

## How was this patch tested?

Manually tested

Closes #22996 from huaxingao/spark-25997.

Authored-by: Huaxin Gao 
Signed-off-by: Sean Owen 
---
 docs/ml-clustering.md  |  6 +++
 .../ml/power_iteration_clustering_example.py   | 49 ++
 2 files changed, 55 insertions(+)

diff --git a/docs/ml-clustering.md b/docs/ml-clustering.md
index 65f2652..21e38ca 100644
--- a/docs/ml-clustering.md
+++ b/docs/ml-clustering.md
@@ -298,6 +298,12 @@ Refer to the [Java API 
docs](api/java/org/apache/spark/ml/clustering/PowerIterat
 {% include_example 
java/org/apache/spark/examples/ml/JavaPowerIterationClusteringExample.java %}
 
 
+
+Refer to the [Python API 
docs](api/python/pyspark.ml.html#pyspark.ml.clustering.PowerIterationClustering)
 for more details.
+
+{% include_example python/ml/power_iteration_clustering_example.py %}
+
+
 
 
 Refer to the [R API docs](api/R/spark.powerIterationClustering.html) for more 
details.
diff --git a/examples/src/main/python/ml/power_iteration_clustering_example.py 
b/examples/src/main/python/ml/power_iteration_clustering_example.py
new file mode 100644
index 000..c983c4a
--- /dev/null
+++ b/examples/src/main/python/ml/power_iteration_clustering_example.py
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+An example demonstrating PowerIterationClustering.
+Run with:
+  bin/spark-submit 
examples/src/main/python/ml/power_iteration_clustering_example.py
+"""
+# $example on$
+from pyspark.ml.clustering import PowerIterationClustering
+# $example off$
+from pyspark.sql import SparkSession
+
+if __name__ == "__main__":
+spark = SparkSession\
+.builder\
+.appName("PowerIterationClusteringExample")\
+.getOrCreate()
+
+# $example on$
+df = spark.createDataFrame([
+(0, 1, 1.0),
+(0, 2, 1.0),
+(1, 2, 1.0),
+(3, 4, 1.0),
+(4, 0, 0.1)
+], ["src", "dst", "weight"])
+
+pic = PowerIterationClustering(k=2, maxIter=20, initMode="degree", 
weightCol="weight")
+
+# Shows the cluster assignment
+pic.assignClusters(df).show()
+# $example off$
+
+spark.stop()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [DOC][MINOR] Add metrics instance 'mesos_cluster' to monitoring doc

2019-01-31 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0fe9c14  [DOC][MINOR] Add metrics instance 'mesos_cluster' to 
monitoring doc
0fe9c14 is described below

commit 0fe9c144fd45e042b41a8fc69b96c377b9f08c4b
Author: SongYadong 
AuthorDate: Thu Jan 31 18:30:17 2019 -0600

[DOC][MINOR] Add metrics instance 'mesos_cluster' to monitoring doc

## What changes were proposed in this pull request?

Metrics instance "mesos_cluster" exists in spark, but not mentioned in 
monitoring.md. This PR add it.

## How was this patch tested?

Manually test.

Closes #23691 from SongYadong/doc_mesos_metrics_inst.

Authored-by: SongYadong 
Signed-off-by: Sean Owen 
---
 docs/monitoring.md | 1 +
 1 file changed, 1 insertion(+)

diff --git a/docs/monitoring.md b/docs/monitoring.md
index 1e292d2..726fb5c 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -659,6 +659,7 @@ set of sinks to which metrics are reported. The following 
instances are currentl
 * `driver`: The Spark driver process (the process in which your SparkContext 
is created).
 * `shuffleService`: The Spark shuffle service.
 * `applicationMaster`: The Spark ApplicationMaster when running on YARN.
+* `mesos_cluster`: The Spark cluster scheduler when running on Mesos.
 
 Each instance can report to zero or more _sinks_. Sinks are contained in the
 `org.apache.spark.metrics.sink` package:


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.3 updated: [SPARK-26757][GRAPHX] Return 0 for `count` on empty Edge/Vertex RDDs

2019-01-31 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
 new 537d15c  [SPARK-26757][GRAPHX] Return 0 for `count` on empty 
Edge/Vertex RDDs
537d15c is described below

commit 537d15ca5edb0f21ce2a15b30866a53675f90382
Author: Huon Wilson 
AuthorDate: Thu Jan 31 17:27:11 2019 -0600

[SPARK-26757][GRAPHX] Return 0 for `count` on empty Edge/Vertex RDDs

## What changes were proposed in this pull request?

Previously a "java.lang.UnsupportedOperationException: empty
collection" exception would be thrown due to using `reduce`, rather
than `fold` or similar that can tolerate empty RDDs.

This behaviour has existed for the Vertex RDDs since it was introduced
in b30e0ae0351be1cbc0b1cf179293587b466ee026. It seems this behaviour
was inherited by the Edge RDDs via copy-paste in
ee29ef3800438501e0ff207feb00a28973fc0769.

## How was this patch tested?

Two new unit tests.

Closes #23681 from huonw/empty-graphx.

Authored-by: Huon Wilson 
Signed-off-by: Sean Owen 
(cherry picked from commit da526985c7574dccdcc0cca7452e2e999a5b3012)
Signed-off-by: Sean Owen 
---
 .../main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala |  2 +-
 .../scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala|  2 +-
 .../main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala  |  2 +-
 .../src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala | 10 ++
 .../test/scala/org/apache/spark/graphx/VertexRDDSuite.scala   | 11 +++
 .../scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala  |  9 +
 6 files changed, 33 insertions(+), 3 deletions(-)

diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
index 376c7b0..eb8abd1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -87,7 +87,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] 
(
 
   /** The number of edges in the RDD. */
   override def count(): Long = {
-partitionsRDD.map(_._2.size.toLong).reduce(_ + _)
+partitionsRDD.map(_._2.size.toLong).fold(0)(_ + _)
   }
 
   override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDDImpl[ED2, 
VD] =
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 3c6f22d..2da9762 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -87,7 +87,7 @@ class VertexRDDImpl[VD] private[graphx] (
 
   /** The number of vertices in the RDD. */
   override def count(): Long = {
-partitionsRDD.map(_.size.toLong).reduce(_ + _)
+partitionsRDD.map(_.size.toLong).fold(0)(_ + _)
   }
 
   override private[graphx] def mapVertexPartitions[VD2: ClassTag](
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
index 59fdd85..2847a4e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
@@ -72,7 +72,7 @@ object SVDPlusPlus {
 
 // calculate global rating mean
 edges.cache()
-val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 + b._1, 
a._2 + b._2))
+val (rs, rc) = edges.map(e => (e.attr, 1L)).fold((0, 0))((a, b) => (a._1 + 
b._1, a._2 + b._2))
 val u = rs / rc
 
 // construct graph
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
index 7a24e32..8fd3e6f 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
@@ -60,4 +60,14 @@ class EdgeRDDSuite extends SparkFunSuite with 
LocalSparkContext {
 }
   }
 
+  test("count") {
+withSpark { sc =>
+  val empty = EdgeRDD.fromEdges(sc.emptyRDD[Edge[Int]])
+  assert(empty.count === 0)
+
+  val edges = List(Edge(0, 1, ()), Edge(1, 2, ()), Edge(2, 0, ()))
+  val nonempty = EdgeRDD.fromEdges(sc.parallelize(edges))
+  assert(nonempty.count === edges.size)
+}
+  }
 }
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index 8e63043..434e6a8 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -223,4 +223,15 @@ class 

[spark] branch master updated: [SPARK-26757][GRAPHX] Return 0 for `count` on empty Edge/Vertex RDDs

2019-01-31 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new da52698  [SPARK-26757][GRAPHX] Return 0 for `count` on empty 
Edge/Vertex RDDs
da52698 is described below

commit da526985c7574dccdcc0cca7452e2e999a5b3012
Author: Huon Wilson 
AuthorDate: Thu Jan 31 17:27:11 2019 -0600

[SPARK-26757][GRAPHX] Return 0 for `count` on empty Edge/Vertex RDDs

## What changes were proposed in this pull request?

Previously a "java.lang.UnsupportedOperationException: empty
collection" exception would be thrown due to using `reduce`, rather
than `fold` or similar that can tolerate empty RDDs.

This behaviour has existed for the Vertex RDDs since it was introduced
in b30e0ae0351be1cbc0b1cf179293587b466ee026. It seems this behaviour
was inherited by the Edge RDDs via copy-paste in
ee29ef3800438501e0ff207feb00a28973fc0769.

## How was this patch tested?

Two new unit tests.

Closes #23681 from huonw/empty-graphx.

Authored-by: Huon Wilson 
Signed-off-by: Sean Owen 
---
 .../main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala |  2 +-
 .../scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala|  2 +-
 .../main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala  |  2 +-
 .../src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala | 10 ++
 .../test/scala/org/apache/spark/graphx/VertexRDDSuite.scala   | 11 +++
 .../scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala  |  9 +
 6 files changed, 33 insertions(+), 3 deletions(-)

diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
index 376c7b0..eb8abd1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -87,7 +87,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] 
(
 
   /** The number of edges in the RDD. */
   override def count(): Long = {
-partitionsRDD.map(_._2.size.toLong).reduce(_ + _)
+partitionsRDD.map(_._2.size.toLong).fold(0)(_ + _)
   }
 
   override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDDImpl[ED2, 
VD] =
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 3c6f22d..2da9762 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -87,7 +87,7 @@ class VertexRDDImpl[VD] private[graphx] (
 
   /** The number of vertices in the RDD. */
   override def count(): Long = {
-partitionsRDD.map(_.size.toLong).reduce(_ + _)
+partitionsRDD.map(_.size.toLong).fold(0)(_ + _)
   }
 
   override private[graphx] def mapVertexPartitions[VD2: ClassTag](
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
index 59fdd85..2847a4e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
@@ -72,7 +72,7 @@ object SVDPlusPlus {
 
 // calculate global rating mean
 edges.cache()
-val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 + b._1, 
a._2 + b._2))
+val (rs, rc) = edges.map(e => (e.attr, 1L)).fold((0, 0))((a, b) => (a._1 + 
b._1, a._2 + b._2))
 val u = rs / rc
 
 // construct graph
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
index 7a24e32..8fd3e6f 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
@@ -60,4 +60,14 @@ class EdgeRDDSuite extends SparkFunSuite with 
LocalSparkContext {
 }
   }
 
+  test("count") {
+withSpark { sc =>
+  val empty = EdgeRDD.fromEdges(sc.emptyRDD[Edge[Int]])
+  assert(empty.count === 0)
+
+  val edges = List(Edge(0, 1, ()), Edge(1, 2, ()), Edge(2, 0, ()))
+  val nonempty = EdgeRDD.fromEdges(sc.parallelize(edges))
+  assert(nonempty.count === edges.size)
+}
+  }
 }
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index 8e63043..434e6a8 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -223,4 +223,15 @@ class VertexRDDSuite extends SparkFunSuite with 
LocalSparkContext {
   assert(verts.collect().toSeq === data) // test 

[spark] branch branch-2.4 updated: [SPARK-26757][GRAPHX] Return 0 for `count` on empty Edge/Vertex RDDs

2019-01-31 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 2b5e033  [SPARK-26757][GRAPHX] Return 0 for `count` on empty 
Edge/Vertex RDDs
2b5e033 is described below

commit 2b5e033eb937a8074e454e1995616f8a1bf370f8
Author: Huon Wilson 
AuthorDate: Thu Jan 31 17:27:11 2019 -0600

[SPARK-26757][GRAPHX] Return 0 for `count` on empty Edge/Vertex RDDs

## What changes were proposed in this pull request?

Previously a "java.lang.UnsupportedOperationException: empty
collection" exception would be thrown due to using `reduce`, rather
than `fold` or similar that can tolerate empty RDDs.

This behaviour has existed for the Vertex RDDs since it was introduced
in b30e0ae0351be1cbc0b1cf179293587b466ee026. It seems this behaviour
was inherited by the Edge RDDs via copy-paste in
ee29ef3800438501e0ff207feb00a28973fc0769.

## How was this patch tested?

Two new unit tests.

Closes #23681 from huonw/empty-graphx.

Authored-by: Huon Wilson 
Signed-off-by: Sean Owen 
(cherry picked from commit da526985c7574dccdcc0cca7452e2e999a5b3012)
Signed-off-by: Sean Owen 
---
 .../main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala |  2 +-
 .../scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala|  2 +-
 .../main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala  |  2 +-
 .../src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala | 10 ++
 .../test/scala/org/apache/spark/graphx/VertexRDDSuite.scala   | 11 +++
 .../scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala  |  9 +
 6 files changed, 33 insertions(+), 3 deletions(-)

diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
index 376c7b0..eb8abd1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -87,7 +87,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] 
(
 
   /** The number of edges in the RDD. */
   override def count(): Long = {
-partitionsRDD.map(_._2.size.toLong).reduce(_ + _)
+partitionsRDD.map(_._2.size.toLong).fold(0)(_ + _)
   }
 
   override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDDImpl[ED2, 
VD] =
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 3c6f22d..2da9762 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -87,7 +87,7 @@ class VertexRDDImpl[VD] private[graphx] (
 
   /** The number of vertices in the RDD. */
   override def count(): Long = {
-partitionsRDD.map(_.size.toLong).reduce(_ + _)
+partitionsRDD.map(_.size.toLong).fold(0)(_ + _)
   }
 
   override private[graphx] def mapVertexPartitions[VD2: ClassTag](
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
index 59fdd85..2847a4e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
@@ -72,7 +72,7 @@ object SVDPlusPlus {
 
 // calculate global rating mean
 edges.cache()
-val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 + b._1, 
a._2 + b._2))
+val (rs, rc) = edges.map(e => (e.attr, 1L)).fold((0, 0))((a, b) => (a._1 + 
b._1, a._2 + b._2))
 val u = rs / rc
 
 // construct graph
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
index 7a24e32..8fd3e6f 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
@@ -60,4 +60,14 @@ class EdgeRDDSuite extends SparkFunSuite with 
LocalSparkContext {
 }
   }
 
+  test("count") {
+withSpark { sc =>
+  val empty = EdgeRDD.fromEdges(sc.emptyRDD[Edge[Int]])
+  assert(empty.count === 0)
+
+  val edges = List(Edge(0, 1, ()), Edge(1, 2, ()), Edge(2, 0, ()))
+  val nonempty = EdgeRDD.fromEdges(sc.parallelize(edges))
+  assert(nonempty.count === edges.size)
+}
+  }
 }
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index 8e63043..434e6a8 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -223,4 +223,15 @@ class 

[spark] branch master updated: [SPARK-26799][BUILD] Make ANTLR v4 version consistent between Maven and SBT

2019-01-31 Thread lixiao
This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2514163  [SPARK-26799][BUILD] Make ANTLR v4 version consistent between 
Maven and SBT
2514163 is described below

commit 2514163366feb91de665b704a6fb703855db6bee
Author: seancxmao 
AuthorDate: Thu Jan 31 14:39:32 2019 -0800

[SPARK-26799][BUILD] Make ANTLR v4 version consistent between Maven and SBT

## What changes were proposed in this pull request?
Currently ANTLR v4 versions used by Maven and SBT are slightly different. 
Maven uses `4.7.1` while SBT uses `4.7`.

* Maven(`pom.xml`): `4.7.1`
* SBT(`project/SparkBuild.scala`): `antlr4Version in Antlr4 := "4.7"`

We should make Maven and SBT use a single version. Furthermore we'd better 
specify antlr4 version in one place to avoid mismatch between Maven and SBT in 
the future.

This PR lets SBT use antlr4 version specified in Maven POM file, rather 
than specify its own antlr4 version. This is in the same as how 
`hadoop.version` is specified in `project/SparkBuild.scala`

## How was this patch tested?
Test locally.

After run `sbt compile`, Java files generated by ANTLR are located at:

```

sql/catalyst/target/scala-2.12/src_managed/main/antlr4/org/apache/spark/sql/catalyst/parser/*.java
```

These Java files have a comment at the head. We can see now SBT uses ANTLR 
`4.7.1`.

```
// Generated from 
.../spark/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
 by ANTLR 4.7.1
```

Closes #23713 from seancxmao/antlr4-version-consistent.

Authored-by: seancxmao 
Signed-off-by: gatorsmile 
---
 project/SparkBuild.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 8d836da..65ba25c 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -568,7 +568,7 @@ object OldDeps {
 
 object Catalyst {
   lazy val settings = antlr4Settings ++ Seq(
-antlr4Version in Antlr4 := "4.7",
+antlr4Version in Antlr4 := 
SbtPomKeys.effectivePom.value.getProperties.get("antlr4.version").asInstanceOf[String],
 antlr4PackageName in Antlr4 := 
Some("org.apache.spark.sql.catalyst.parser"),
 antlr4GenListener in Antlr4 := true,
 antlr4GenVisitor in Antlr4 := true


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.3 updated: [SPARK-26726] Synchronize the amount of memory used by the broadcast variable to the UI display

2019-01-31 Thread vanzin
This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
 new 94a4b46  [SPARK-26726] Synchronize the amount of memory used by the 
broadcast variable to the UI display
94a4b46 is described below

commit 94a4b465bba39dc866c2b060efd1002b55aa7ac2
Author: 韩田田00222924 
AuthorDate: Thu Jan 31 09:17:33 2019 -0800

[SPARK-26726] Synchronize the amount of memory used by the broadcast 
variable to the UI display

…not synchronized to the UI display

## What changes were proposed in this pull request?
The amount of memory used by the broadcast variable is not synchronized to 
the UI display.
I added the case for BroadcastBlockId and updated the memory usage.

## How was this patch tested?

We can test this patch with unit tests.

Closes #23649 from httfighter/SPARK-26726.

Lead-authored-by: 韩田田00222924 
Co-authored-by: han.tiant...@zte.com.cn 
Signed-off-by: Marcelo Vanzin 
(cherry picked from commit f4a17e916b729f9dc46e859b50a416db1e37b92e)
Signed-off-by: Marcelo Vanzin 
---
 .../apache/spark/status/AppStatusListener.scala| 44 +-
 .../spark/status/AppStatusListenerSuite.scala  | 18 +
 2 files changed, 53 insertions(+), 9 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 3164dc7..325e3bd 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -678,6 +678,7 @@ private[spark] class AppStatusListener(
 event.blockUpdatedInfo.blockId match {
   case block: RDDBlockId => updateRDDBlock(event, block)
   case stream: StreamBlockId => updateStreamBlock(event, stream)
+  case broadcast: BroadcastBlockId => updateBroadcastBlock(event, 
broadcast)
   case _ =>
 }
   }
@@ -736,15 +737,7 @@ private[spark] class AppStatusListener(
 // Update the executor stats first, since they are used to calculate the 
free memory
 // on tracked RDD distributions.
 maybeExec.foreach { exec =>
-  if (exec.hasMemoryInfo) {
-if (storageLevel.useOffHeap) {
-  exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta)
-} else {
-  exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta)
-}
-  }
-  exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta)
-  exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta)
+  updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta)
 }
 
 // Update the block entry in the RDD info, keeping track of the deltas 
above so that we
@@ -846,6 +839,39 @@ private[spark] class AppStatusListener(
 }
   }
 
+  private def updateBroadcastBlock(
+  event: SparkListenerBlockUpdated,
+  broadcast: BroadcastBlockId): Unit = {
+val executorId = event.blockUpdatedInfo.blockManagerId.executorId
+liveExecutors.get(executorId).foreach { exec =>
+  val now = System.nanoTime()
+  val storageLevel = event.blockUpdatedInfo.storageLevel
+
+  // Whether values are being added to or removed from the existing 
accounting.
+  val diskDelta = event.blockUpdatedInfo.diskSize * (if 
(storageLevel.useDisk) 1 else -1)
+  val memoryDelta = event.blockUpdatedInfo.memSize * (if 
(storageLevel.useMemory) 1 else -1)
+
+  updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta)
+  maybeUpdate(exec, now)
+}
+  }
+
+  private def updateExecutorMemoryDiskInfo(
+  exec: LiveExecutor,
+  storageLevel: StorageLevel,
+  memoryDelta: Long,
+  diskDelta: Long): Unit = {
+if (exec.hasMemoryInfo) {
+  if (storageLevel.useOffHeap) {
+exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta)
+  } else {
+exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta)
+  }
+}
+exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta)
+exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta)
+  }
+
   private def getOrCreateStage(info: StageInfo): LiveStage = {
 val stage = liveStages.computeIfAbsent((info.stageId, info.attemptNumber),
   new Function[(Int, Int), LiveStage]() {
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 293cf0d..61ed0c8 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -875,6 +875,24 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
 intercept[NoSuchElementException] {
  

[spark] branch branch-2.4 updated: [SPARK-26726] Synchronize the amount of memory used by the broadcast variable to the UI display

2019-01-31 Thread vanzin
This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new d9403e4  [SPARK-26726] Synchronize the amount of memory used by the 
broadcast variable to the UI display
d9403e4 is described below

commit d9403e47f5c04f3d3c3c3ea573d38c393c5a470b
Author: 韩田田00222924 
AuthorDate: Thu Jan 31 09:17:33 2019 -0800

[SPARK-26726] Synchronize the amount of memory used by the broadcast 
variable to the UI display

…not synchronized to the UI display

## What changes were proposed in this pull request?
The amount of memory used by the broadcast variable is not synchronized to 
the UI display.
I added the case for BroadcastBlockId and updated the memory usage.

## How was this patch tested?

We can test this patch with unit tests.

Closes #23649 from httfighter/SPARK-26726.

Lead-authored-by: 韩田田00222924 
Co-authored-by: han.tiant...@zte.com.cn 
Signed-off-by: Marcelo Vanzin 
(cherry picked from commit f4a17e916b729f9dc46e859b50a416db1e37b92e)
Signed-off-by: Marcelo Vanzin 
---
 .../apache/spark/status/AppStatusListener.scala| 44 +-
 .../spark/status/AppStatusListenerSuite.scala  | 18 +
 2 files changed, 53 insertions(+), 9 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 5b564ef..c4dd47d 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -750,6 +750,7 @@ private[spark] class AppStatusListener(
 event.blockUpdatedInfo.blockId match {
   case block: RDDBlockId => updateRDDBlock(event, block)
   case stream: StreamBlockId => updateStreamBlock(event, stream)
+  case broadcast: BroadcastBlockId => updateBroadcastBlock(event, 
broadcast)
   case _ =>
 }
   }
@@ -808,15 +809,7 @@ private[spark] class AppStatusListener(
 // Update the executor stats first, since they are used to calculate the 
free memory
 // on tracked RDD distributions.
 maybeExec.foreach { exec =>
-  if (exec.hasMemoryInfo) {
-if (storageLevel.useOffHeap) {
-  exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta)
-} else {
-  exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta)
-}
-  }
-  exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta)
-  exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta)
+  updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta)
 }
 
 // Update the block entry in the RDD info, keeping track of the deltas 
above so that we
@@ -918,6 +911,39 @@ private[spark] class AppStatusListener(
 }
   }
 
+  private def updateBroadcastBlock(
+  event: SparkListenerBlockUpdated,
+  broadcast: BroadcastBlockId): Unit = {
+val executorId = event.blockUpdatedInfo.blockManagerId.executorId
+liveExecutors.get(executorId).foreach { exec =>
+  val now = System.nanoTime()
+  val storageLevel = event.blockUpdatedInfo.storageLevel
+
+  // Whether values are being added to or removed from the existing 
accounting.
+  val diskDelta = event.blockUpdatedInfo.diskSize * (if 
(storageLevel.useDisk) 1 else -1)
+  val memoryDelta = event.blockUpdatedInfo.memSize * (if 
(storageLevel.useMemory) 1 else -1)
+
+  updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta)
+  maybeUpdate(exec, now)
+}
+  }
+
+  private def updateExecutorMemoryDiskInfo(
+  exec: LiveExecutor,
+  storageLevel: StorageLevel,
+  memoryDelta: Long,
+  diskDelta: Long): Unit = {
+if (exec.hasMemoryInfo) {
+  if (storageLevel.useOffHeap) {
+exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta)
+  } else {
+exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta)
+  }
+}
+exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta)
+exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta)
+  }
+
   private def getOrCreateStage(info: StageInfo): LiveStage = {
 val stage = liveStages.computeIfAbsent((info.stageId, info.attemptNumber),
   new Function[(Int, Int), LiveStage]() {
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index f34be48..6214089 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -939,6 +939,24 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
 intercept[NoSuchElementException] {
  

[spark] branch master updated: [SPARK-26726] Synchronize the amount of memory used by the broadcast variable to the UI display

2019-01-31 Thread vanzin
This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f4a17e9  [SPARK-26726] Synchronize the amount of memory used by the 
broadcast variable to the UI display
f4a17e9 is described below

commit f4a17e916b729f9dc46e859b50a416db1e37b92e
Author: 韩田田00222924 
AuthorDate: Thu Jan 31 09:17:33 2019 -0800

[SPARK-26726] Synchronize the amount of memory used by the broadcast 
variable to the UI display

…not synchronized to the UI display

## What changes were proposed in this pull request?
The amount of memory used by the broadcast variable is not synchronized to 
the UI display.
I added the case for BroadcastBlockId and updated the memory usage.

## How was this patch tested?

We can test this patch with unit tests.

Closes #23649 from httfighter/SPARK-26726.

Lead-authored-by: 韩田田00222924 
Co-authored-by: han.tiant...@zte.com.cn 
Signed-off-by: Marcelo Vanzin 
---
 .../apache/spark/status/AppStatusListener.scala| 44 +-
 .../spark/status/AppStatusListenerSuite.scala  | 18 +
 2 files changed, 53 insertions(+), 9 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index f69c7dd..3089f05 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -829,6 +829,7 @@ private[spark] class AppStatusListener(
 event.blockUpdatedInfo.blockId match {
   case block: RDDBlockId => updateRDDBlock(event, block)
   case stream: StreamBlockId => updateStreamBlock(event, stream)
+  case broadcast: BroadcastBlockId => updateBroadcastBlock(event, 
broadcast)
   case _ =>
 }
   }
@@ -887,15 +888,7 @@ private[spark] class AppStatusListener(
 // Update the executor stats first, since they are used to calculate the 
free memory
 // on tracked RDD distributions.
 maybeExec.foreach { exec =>
-  if (exec.hasMemoryInfo) {
-if (storageLevel.useOffHeap) {
-  exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta)
-} else {
-  exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta)
-}
-  }
-  exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta)
-  exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta)
+  updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta)
 }
 
 // Update the block entry in the RDD info, keeping track of the deltas 
above so that we
@@ -997,6 +990,39 @@ private[spark] class AppStatusListener(
 }
   }
 
+  private def updateBroadcastBlock(
+  event: SparkListenerBlockUpdated,
+  broadcast: BroadcastBlockId): Unit = {
+val executorId = event.blockUpdatedInfo.blockManagerId.executorId
+liveExecutors.get(executorId).foreach { exec =>
+  val now = System.nanoTime()
+  val storageLevel = event.blockUpdatedInfo.storageLevel
+
+  // Whether values are being added to or removed from the existing 
accounting.
+  val diskDelta = event.blockUpdatedInfo.diskSize * (if 
(storageLevel.useDisk) 1 else -1)
+  val memoryDelta = event.blockUpdatedInfo.memSize * (if 
(storageLevel.useMemory) 1 else -1)
+
+  updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta)
+  maybeUpdate(exec, now)
+}
+  }
+
+  private def updateExecutorMemoryDiskInfo(
+  exec: LiveExecutor,
+  storageLevel: StorageLevel,
+  memoryDelta: Long,
+  diskDelta: Long): Unit = {
+if (exec.hasMemoryInfo) {
+  if (storageLevel.useOffHeap) {
+exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta)
+  } else {
+exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta)
+  }
+}
+exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta)
+exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta)
+  }
+
   private def getOrCreateStage(info: StageInfo): LiveStage = {
 val stage = liveStages.computeIfAbsent((info.stageId, info.attemptNumber),
   new Function[(Int, Int), LiveStage]() {
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 356e6d1..9469a46 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -938,6 +938,24 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
 intercept[NoSuchElementException] {
   check[StreamBlockData](stream1.name) { _ => () }
 }
+
+// Update a BroadcastBlock.
+val broadcast1 

[spark] branch master updated: [SPARK-26673][SQL] File source V2 writes: create framework and migrate ORC

2019-01-31 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new df4c53e  [SPARK-26673][SQL] File source V2 writes: create framework 
and migrate ORC
df4c53e is described below

commit df4c53e44bc9837a470ec66486237403868cb04f
Author: Gengliang Wang 
AuthorDate: Thu Jan 31 21:29:01 2019 +0800

[SPARK-26673][SQL] File source V2 writes: create framework and migrate ORC

## What changes were proposed in this pull request?

Create a framework for write path of File Source V2.
Also, migrate write path of ORC to V2.

Supported:
* Write to file as Dataframe

Not Supported:
* Partitioning, which is still under development in the data source V2 
project.
* Bucketing, which is still under development in the data source V2 project.
* Catalog.

## How was this patch tested?

Unit test

Closes #23601 from gengliangwang/orc_write.

Authored-by: Gengliang Wang 
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/sql/internal/SQLConf.scala|  10 ++
 .../org/apache/spark/sql/DataFrameWriter.scala |  17 ++-
 .../sql/execution/datasources/DataSource.scala |   2 +-
 .../datasources/FallbackOrcDataSourceV2.scala  |   4 +-
 .../datasources/FileFormatDataWriter.scala |   6 +-
 .../execution/datasources/FileFormatWriter.scala   |   2 +-
 .../datasources/orc/OrcOutputWriter.scala  |   2 +-
 .../execution/datasources/v2/FileBatchWrite.scala  |  53 +++
 .../sql/execution/datasources/v2/FileTable.scala   |   5 +-
 .../datasources/v2/FileWriteBuilder.scala  | 158 +
 .../datasources/v2/FileWriterFactory.scala |  58 
 .../datasources/v2/WriteToDataSourceV2Exec.scala   |  13 +-
 .../execution/datasources/v2/orc/OrcTable.scala|   4 +
 .../datasources/v2/orc/OrcWriteBuilder.scala   |  66 +
 .../spark/sql/FileBasedDataSourceSuite.scala   |  67 -
 .../sources/v2/FileDataSourceV2FallBackSuite.scala |  67 -
 16 files changed, 486 insertions(+), 48 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 4484273..11e1a5e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1440,6 +1440,14 @@ object SQLConf {
 .stringConf
 .createWithDefault("")
 
+  val USE_V1_SOURCE_WRITER_LIST = 
buildConf("spark.sql.sources.write.useV1SourceList")
+.internal()
+.doc("A comma-separated list of data source short names or fully qualified 
data source" +
+  " register class names for which data source V2 write paths are 
disabled. Writes from these" +
+  " sources will fall back to the V1 sources.")
+.stringConf
+.createWithDefault("")
+
   val DISABLED_V2_STREAMING_WRITERS = 
buildConf("spark.sql.streaming.disabledV2Writers")
 .doc("A comma-separated list of fully qualified data source register class 
names for which" +
   " StreamWriteSupport is disabled. Writes to these sources will fall back 
to the V1 Sinks.")
@@ -2026,6 +2034,8 @@ class SQLConf extends Serializable with Logging {
 
   def userV1SourceReaderList: String = getConf(USE_V1_SOURCE_READER_LIST)
 
+  def userV1SourceWriterList: String = getConf(USE_V1_SOURCE_WRITER_LIST)
+
   def disabledV2StreamingWriters: String = 
getConf(DISABLED_V2_STREAMING_WRITERS)
 
   def disabledV2StreamingMicroBatchReaders: String =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index d9404cd..e5f9473 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -29,7 +29,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable,
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, 
LogicalRelation}
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
DataSourceV2Utils, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
DataSourceV2Utils, FileDataSourceV2, WriteToDataSourceV2}
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode
@@ -243,8 +243,19 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 assertNotBucketed("save")
 
 val session = df.sparkSession
-val cls = DataSource.lookupDataSource(source, 

[spark] branch master updated: [SPARK-19591][ML][MLLIB][FOLLOWUP] Add sample weights to decision trees - fix tolerance

2019-01-31 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b3b62ba  [SPARK-19591][ML][MLLIB][FOLLOWUP] Add sample weights to 
decision trees - fix tolerance
b3b62ba is described below

commit b3b62ba303af9daad4826d274856c61acb88a6a1
Author: Ilya Matiach 
AuthorDate: Thu Jan 31 05:44:55 2019 -0600

[SPARK-19591][ML][MLLIB][FOLLOWUP] Add sample weights to decision trees - 
fix tolerance

This is a follow-up to PR:
https://github.com/apache/spark/pull/21632

## What changes were proposed in this pull request?

This PR tunes the tolerance used for deciding whether to add zero feature 
values to a value-count map (where the key is the feature value and the value 
is the weighted count of those feature values).
In the previous PR the tolerance scaled by the square of the unweighted 
number of samples, which is too aggressive for a large number of unweighted 
samples.  Unfortunately using just "Utils.EPSILON * unweightedNumSamples" is 
not enough either, so I multiplied that by a factor tuned by the testing 
procedure below.

## How was this patch tested?

This involved manually running the sample weight tests for decision tree 
regressor to see whether the tolerance was large enough to exclude zero feature 
values.

Eg in SBT:
```
./build/sbt
> project mllib
> testOnly *DecisionTreeRegressorSuite -- -z "training with sample weights"
```

For validation, I added a print inside the if in the code below and 
validated that the tolerance was large enough so that we would not include zero 
features (which don't exist in that test):
```
  val valueCountMap = if (weightedNumSamples - partNumSamples > 
tolerance) {
print("should not print this")
partValueCountMap + (0.0 -> (weightedNumSamples - partNumSamples))
  } else {
partValueCountMap
  }
```

Closes #23682 from imatiach-msft/ilmat/sample-weights-tol.

Authored-by: Ilya Matiach 
Signed-off-by: Sean Owen 
---
 .../src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala  | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala 
b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
index fb4c321..b041dd4 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
@@ -1050,8 +1050,11 @@ private[spark] object RandomForest extends Logging with 
Serializable {
   // Calculate the expected number of samples for finding splits
   val weightedNumSamples = samplesFractionForFindSplits(metadata) *
 metadata.weightedNumExamples
+  // scale tolerance by number of samples with constant factor
+  // Note: constant factor was tuned by running some tests where there 
were no zero
+  // feature values and validating we are never within tolerance
+  val tolerance = Utils.EPSILON * unweightedNumSamples * 100
   // add expected zero value count and get complete statistics
-  val tolerance = Utils.EPSILON * unweightedNumSamples * 
unweightedNumSamples
   val valueCountMap = if (weightedNumSamples - partNumSamples > tolerance) 
{
 partValueCountMap + (0.0 -> (weightedNumSamples - partNumSamples))
   } else {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-24779][R] Add map_concat / map_from_entries / an option in months_between UDF to disable rounding-off

2019-01-31 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new bc6f191  [SPARK-24779][R] Add map_concat / map_from_entries / an 
option in months_between UDF to disable rounding-off
bc6f191 is described below

commit bc6f19145192835cdfa4fc263b1c35b294c1e0ac
Author: Huaxin Gao 
AuthorDate: Thu Jan 31 19:38:32 2019 +0800

[SPARK-24779][R] Add map_concat / map_from_entries / an option in 
months_between UDF to disable rounding-off

## What changes were proposed in this pull request?

Add the R version of map_concat / map_from_entries / an option in 
months_between UDF to disable rounding-off

## How was this patch tested?

Add test in test_sparkSQL.R

Closes #21835 from huaxingao/spark-24779.

Authored-by: Huaxin Gao 
Signed-off-by: Hyukjin Kwon 
---
 R/pkg/NAMESPACE   |  2 ++
 R/pkg/R/functions.R   | 60 +++
 R/pkg/R/generics.R| 10 +-
 R/pkg/tests/fulltests/test_sparkSQL.R | 22 +
 4 files changed, 87 insertions(+), 7 deletions(-)

diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index cfad20d..1dcad16 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -312,8 +312,10 @@ exportMethods("%<=>%",
   "lower",
   "lpad",
   "ltrim",
+  "map_concat",
   "map_entries",
   "map_from_arrays",
+  "map_from_entries",
   "map_keys",
   "map_values",
   "max",
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index 58fc410..8f425b1 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -80,6 +80,11 @@ NULL
 #'  \item \code{from_utc_timestamp}, \code{to_utc_timestamp}: time 
zone to use.
 #'  \item \code{next_day}: day of the week string.
 #'  }
+#' @param ... additional argument(s).
+#'  \itemize{
+#'  \item \code{months_between}, this contains an optional parameter 
to specify the
+#'  the result is rounded off to 8 digits.
+#'  }
 #'
 #' @name column_datetime_diff_functions
 #' @rdname column_datetime_diff_functions
@@ -217,6 +222,7 @@ NULL
 #'  additional named properties to control how it is converted and 
accepts the
 #'  same options as the CSV data source.
 #'  \item \code{arrays_zip}, this contains additional Columns of 
arrays to be merged.
+#'  \item \code{map_concat}, this contains additional Columns of maps 
to be unioned.
 #'  }
 #' @name column_collection_functions
 #' @rdname column_collection_functions
@@ -229,7 +235,7 @@ NULL
 #' head(select(tmp, array_contains(tmp$v1, 21), size(tmp$v1), shuffle(tmp$v1)))
 #' head(select(tmp, array_max(tmp$v1), array_min(tmp$v1), 
array_distinct(tmp$v1)))
 #' head(select(tmp, array_position(tmp$v1, 21), array_repeat(df$mpg, 3), 
array_sort(tmp$v1)))
-#' head(select(tmp, flatten(tmp$v1), reverse(tmp$v1), array_remove(tmp$v1, 
21)))
+#' head(select(tmp, reverse(tmp$v1), array_remove(tmp$v1, 21)))
 #' tmp2 <- mutate(tmp, v2 = explode(tmp$v1))
 #' head(tmp2)
 #' head(select(tmp, posexplode(tmp$v1)))
@@ -238,15 +244,21 @@ NULL
 #' head(select(tmp, sort_array(tmp$v1, asc = FALSE)))
 #' tmp3 <- mutate(df, v3 = create_map(df$model, df$cyl))
 #' head(select(tmp3, map_entries(tmp3$v3), map_keys(tmp3$v3), 
map_values(tmp3$v3)))
-#' head(select(tmp3, element_at(tmp3$v3, "Valiant")))
+#' head(select(tmp3, element_at(tmp3$v3, "Valiant"), map_concat(tmp3$v3, 
tmp3$v3)))
 #' tmp4 <- mutate(df, v4 = create_array(df$mpg, df$cyl), v5 = 
create_array(df$cyl, df$hp))
 #' head(select(tmp4, concat(tmp4$v4, tmp4$v5), arrays_overlap(tmp4$v4, 
tmp4$v5)))
 #' head(select(tmp4, array_except(tmp4$v4, tmp4$v5), array_intersect(tmp4$v4, 
tmp4$v5)))
 #' head(select(tmp4, array_union(tmp4$v4, tmp4$v5)))
-#' head(select(tmp4, arrays_zip(tmp4$v4, tmp4$v5), map_from_arrays(tmp4$v4, 
tmp4$v5)))
+#' head(select(tmp4, arrays_zip(tmp4$v4, tmp4$v5)))
 #' head(select(tmp, concat(df$mpg, df$cyl, df$hp)))
 #' tmp5 <- mutate(df, v6 = create_array(df$model, df$model))
-#' head(select(tmp5, array_join(tmp5$v6, "#"), array_join(tmp5$v6, "#", 
"NULL")))}
+#' head(select(tmp5, array_join(tmp5$v6, "#"), array_join(tmp5$v6, "#", 
"NULL")))
+#' tmp6 <- mutate(df, v7 = create_array(create_array(df$model, df$model)))
+#' head(select(tmp6, flatten(tmp6$v7)))
+#' tmp7 <- mutate(df, v8 = create_array(df$model, df$cyl), v9 = 
create_array(df$model, df$hp))
+#' head(select(tmp7, map_from_arrays(tmp7$v8, tmp7$v9)))
+#' tmp8 <- mutate(df, v10 = create_array(struct(df$model, df$cyl)))
+#' head(select(tmp8, map_from_entries(tmp8$v10)))}
 NULL
 
 #' Window functions for Column operations
@@ -2074,15 +2086,21 @@ setMethod("levenshtein", signature(y = 

[spark] branch master updated: [SPARK-26448][SQL][FOLLOWUP] should not normalize grouping expressions for final aggregate

2019-01-31 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0e2c487  [SPARK-26448][SQL][FOLLOWUP] should not normalize grouping 
expressions for final aggregate
0e2c487 is described below

commit 0e2c4874596269dd835bf69a5592b316345597c5
Author: Wenchen Fan 
AuthorDate: Thu Jan 31 16:20:18 2019 +0800

[SPARK-26448][SQL][FOLLOWUP] should not normalize grouping expressions for 
final aggregate

## What changes were proposed in this pull request?

A followup of https://github.com/apache/spark/pull/23388 .

`AggUtils.createAggregate` is not the right place to normalize the grouping 
expressions, as final aggregate is also created by it. The grouping expressions 
of final aggregate should be attributes which refer to the grouping expressions 
in partial aggregate.

This PR moves the normalization to the caller side of `AggUtils`.

## How was this patch tested?

existing tests

Closes #23692 from cloud-fan/follow.

Authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../optimizer/NormalizeFloatingNumbers.scala   | 16 --
 .../spark/sql/execution/SparkStrategies.scala  | 25 +++---
 .../spark/sql/execution/aggregate/AggUtils.scala   | 14 +++-
 3 files changed, 34 insertions(+), 21 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala
index 520f24a..a5921eb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala
@@ -98,8 +98,10 @@ object NormalizeFloatingNumbers extends Rule[LogicalPlan] {
   }
 
   private[sql] def normalize(expr: Expression): Expression = expr match {
-case _ if expr.dataType == FloatType || expr.dataType == DoubleType =>
-  NormalizeNaNAndZero(expr)
+case _ if !needNormalize(expr.dataType) => expr
+
+case a: Alias =>
+  a.withNewChildren(Seq(normalize(a.child)))
 
 case CreateNamedStruct(children) =>
   CreateNamedStruct(children.map(normalize))
@@ -113,22 +115,22 @@ object NormalizeFloatingNumbers extends Rule[LogicalPlan] 
{
 case CreateMap(children) =>
   CreateMap(children.map(normalize))
 
-case a: Alias if needNormalize(a.dataType) =>
-  a.withNewChildren(Seq(normalize(a.child)))
+case _ if expr.dataType == FloatType || expr.dataType == DoubleType =>
+  NormalizeNaNAndZero(expr)
 
-case _ if expr.dataType.isInstanceOf[StructType] && 
needNormalize(expr.dataType) =>
+case _ if expr.dataType.isInstanceOf[StructType] =>
   val fields = expr.dataType.asInstanceOf[StructType].fields.indices.map { 
i =>
 normalize(GetStructField(expr, i))
   }
   CreateStruct(fields)
 
-case _ if expr.dataType.isInstanceOf[ArrayType] && 
needNormalize(expr.dataType) =>
+case _ if expr.dataType.isInstanceOf[ArrayType] =>
   val ArrayType(et, containsNull) = expr.dataType
   val lv = NamedLambdaVariable("arg", et, containsNull)
   val function = normalize(lv)
   ArrayTransform(expr, LambdaFunction(function, Seq(lv)))
 
-case _ => expr
+case _ => throw new IllegalStateException(s"fail to normalize $expr")
   }
 }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index b7cc373..edfa704 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.optimizer.NormalizeFloatingNumbers
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -331,8 +332,17 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
 val stateVersion = 
conf.getConf(SQLConf.STREAMING_AGGREGATION_STATE_FORMAT_VERSION)
 
+// Ideally this should be done in `NormalizeFloatingNumbers`, but we 
do it here because
+// `groupingExpressions` is not extracted during logical phase.
+val normalizedGroupingExpressions = namedGroupingExpressions.map { e =>
+  NormalizeFloatingNumbers.normalize(e) match {
+case n: