[spark] branch master updated: [SPARK-26651][SQL][DOC] Collapse notes related to java.time API

2019-02-01 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b85974d  [SPARK-26651][SQL][DOC] Collapse notes related to java.time 
API
b85974d is described below

commit b85974db85a881a2e8aebc31cb4f578008648ab9
Author: Maxim Gekk 
AuthorDate: Sat Feb 2 11:17:33 2019 +0800

[SPARK-26651][SQL][DOC] Collapse notes related to java.time API

## What changes were proposed in this pull request?

Collapsed notes about using Java 8 API for date/timestamp manipulations and 
Proleptic Gregorian calendar in the SQL migration guide.

Closes #23722 from MaxGekk/collapse-notes.

Authored-by: Maxim Gekk 
Signed-off-by: Hyukjin Kwon 
---
 docs/sql-migration-guide-upgrade.md | 14 +++---
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/docs/sql-migration-guide-upgrade.md 
b/docs/sql-migration-guide-upgrade.md
index 41f27a3..dbf9df0 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -31,14 +31,10 @@ displayTitle: Spark SQL Upgrading Guide
 
   - In Spark version 2.4 and earlier, the `SET` command works without any 
warnings even if the specified key is for `SparkConf` entries and it has no 
effect because the command does not update `SparkConf`, but the behavior might 
confuse users. Since 3.0, the command fails if a `SparkConf` key is used. You 
can disable such a check by setting 
`spark.sql.legacy.setCommandRejectsSparkCoreConfs` to `false`.
 
-  - Since Spark 3.0, CSV/JSON datasources use java.time API for parsing and 
generating CSV/JSON content. In Spark version 2.4 and earlier, 
java.text.SimpleDateFormat is used for the same purpose with fallbacks to the 
parsing mechanisms of Spark 2.0 and 1.x. For example, `2018-12-08 10:39:21.123` 
with the pattern `-MM-dd'T'HH:mm:ss.SSS` cannot be parsed since Spark 3.0 
because the timestamp does not match to the pattern but it can be parsed by 
earlier Spark versions due to a fallback  [...]
-
   - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV 
string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, the 
returned row can contain non-`null` fields if some of CSV column values were 
parsed and converted to desired types successfully.
 
   - In Spark version 2.4 and earlier, JSON datasource and JSON functions like 
`from_json` convert a bad JSON record to a row with all `null`s in the 
PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, the 
returned row can contain non-`null` fields if some of JSON column values were 
parsed and converted to desired types successfully.
 
-  - Since Spark 3.0, the `unix_timestamp`, `date_format`, `to_unix_timestamp`, 
`from_unixtime`, `to_date`, `to_timestamp` functions use java.time API for 
parsing and formatting dates/timestamps from/to strings by using ISO chronology 
(https://docs.oracle.com/javase/8/docs/api/java/time/chrono/IsoChronology.html) 
based on Proleptic Gregorian calendar. In Spark version 2.4 and earlier, 
java.text.SimpleDateFormat and java.util.GregorianCalendar (hybrid calendar 
that supports both the Julian [...]
-
   - Since Spark 3.0, JSON datasource and JSON function `schema_of_json` infer 
TimestampType from string values if they match to the pattern defined by the 
JSON option `timestampFormat`. Set JSON option `inferTimestamp` to `false` to 
disable such type inferring.
 
   - In PySpark, when Arrow optimization is enabled, if Arrow version is higher 
than 0.11.0, Arrow can perform safe type conversion when converting 
Pandas.Series to Arrow array during serialization. Arrow will raise errors when 
detecting unsafe type conversion like overflow. Setting 
`spark.sql.execution.pandas.arrowSafeTypeConversion` to true can enable it. The 
default setting is false. PySpark's behavior for Arrow versions is illustrated 
in the table below:
@@ -91,11 +87,15 @@ displayTitle: Spark SQL Upgrading Guide
 
   - In Spark version 2.4 and earlier, if 
`org.apache.spark.sql.functions.udf(Any, DataType)` gets a Scala closure with 
primitive-type argument, the returned UDF will return null if the input values 
is null. Since Spark 3.0, the UDF will return the default value of the Java 
type if the input value is null. For example, `val f = udf((x: Int) => x, 
IntegerType)`, `f($"x")` will return null in Spark 2.4 and earlier if column 
`x` is null, and return 0 in Spark 3.0. This behavior change is int [...]
 
-  - Since Spark 3.0, the `weekofyear`, `weekday` and `dayofweek` functions use 
java.time API for calculation week number of year and day number of week based 
on Proleptic Gregorian calendar. In Spark version 2.4 and earlier, the hybrid 
calendar (Julian + Gregorian) is used for the same purpose. Results of the 
functions returned by Spark 3.0 and previous versions can be 

[spark] branch master updated: [SPARK-18161][PYTHON] Update cloudpickle to v0.6.1

2019-02-01 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 75ea89a  [SPARK-18161][PYTHON] Update cloudpickle to v0.6.1
75ea89a is described below

commit 75ea89ad94ca76646e4697cf98c78d14c6e2695f
Author: Boris Shminke 
AuthorDate: Sat Feb 2 10:49:45 2019 +0800

[SPARK-18161][PYTHON] Update cloudpickle to v0.6.1

## What changes were proposed in this pull request?

In this PR we've done two things:
1) updated the Spark's copy of cloudpickle to 0.6.1 (current stable)
The main reason Spark stayed with cloudpickle 0.4.x was that the default 
pickle protocol was changed in later versions.

2) started using pickle.HIGHEST_PROTOCOL for both Python 2 and Python 3 for 
serializers and broadcast
[Pyrolite](https://github.com/irmen/Pyrolite) has such Pickle protocol 
version support: reading: 0,1,2,3,4; writing: 2.

## How was this patch tested?

Jenkins tests.

Authors: Sloane Simmons, Boris Shminke

This contribution is original work of Sloane Simmons and Boris Shminke and 
they licensed it to the project under the project's open source license.

Closes #20691 from inpefess/pickle_protocol_4.

Lead-authored-by: Boris Shminke 
Co-authored-by: singularperturbation 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/broadcast.py  |   4 +-
 python/pyspark/cloudpickle.py| 259 ---
 python/pyspark/serializers.py|   7 +-
 python/pyspark/tests/test_rdd.py |   2 +-
 4 files changed, 194 insertions(+), 78 deletions(-)

diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py
index 43a5ead..cca64b5 100644
--- a/python/pyspark/broadcast.py
+++ b/python/pyspark/broadcast.py
@@ -23,7 +23,7 @@ import threading
 
 from pyspark.cloudpickle import print_exec
 from pyspark.java_gateway import local_connect_and_auth
-from pyspark.serializers import ChunkedStream
+from pyspark.serializers import ChunkedStream, pickle_protocol
 from pyspark.util import _exception_message
 
 if sys.version < '3':
@@ -109,7 +109,7 @@ class Broadcast(object):
 
 def dump(self, value, f):
 try:
-pickle.dump(value, f, 2)
+pickle.dump(value, f, pickle_protocol)
 except pickle.PickleError:
 raise
 except Exception as e:
diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py
index 88519d7..bf92569 100644
--- a/python/pyspark/cloudpickle.py
+++ b/python/pyspark/cloudpickle.py
@@ -42,20 +42,26 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 """
 from __future__ import print_function
 
-import dis
-from functools import partial
-import imp
 import io
-import itertools
-import logging
+import dis
+import sys
+import types
 import opcode
-import operator
 import pickle
 import struct
-import sys
-import traceback
-import types
+import logging
 import weakref
+import operator
+import importlib
+import itertools
+import traceback
+from functools import partial
+
+
+# cloudpickle is meant for inter process communication: we expect all
+# communicating processes to run the same Python version hence we favor
+# communication speed over compatibility:
+DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL
 
 
 if sys.version < '3':
@@ -72,6 +78,22 @@ else:
 PY3 = True
 
 
+# Container for the global namespace to ensure consistent unpickling of
+# functions defined in dynamic modules (modules not registed in sys.modules).
+_dynamic_modules_globals = weakref.WeakValueDictionary()
+
+
+class _DynamicModuleFuncGlobals(dict):
+"""Global variables referenced by a function defined in a dynamic module
+
+To avoid leaking references we store such context in a WeakValueDictionary
+instance.  However instances of python builtin types such as dict cannot
+be used directly as values in such a construct, hence the need for a
+derived class.
+"""
+pass
+
+
 def _make_cell_set_template_code():
 """Get the Python compiler to emit LOAD_FAST(arg); STORE_DEREF
 
@@ -157,7 +179,7 @@ def cell_set(cell, value):
 )(value)
 
 
-#relevant opcodes
+# relevant opcodes
 STORE_GLOBAL = opcode.opmap['STORE_GLOBAL']
 DELETE_GLOBAL = opcode.opmap['DELETE_GLOBAL']
 LOAD_GLOBAL = opcode.opmap['LOAD_GLOBAL']
@@ -167,7 +189,7 @@ EXTENDED_ARG = dis.EXTENDED_ARG
 
 
 def islambda(func):
-return getattr(func,'__name__') == ''
+return getattr(func, '__name__') == ''
 
 
 _BUILTIN_TYPE_NAMES = {}
@@ -248,7 +270,9 @@ class CloudPickler(Pickler):
 dispatch = Pickler.dispatch.copy()
 
 def __init__(self, file, protocol=None):
-Pickler.__init__(self, file, protocol)
+if protocol is None:
+protocol = DEFAULT_PROTOCOL
+Pickler.__init__(self, file, protocol=protocol)
 # set of modules to 

[spark] branch master updated: [SPARK-26714][CORE][WEBUI] Show 0 partition job in WebUI

2019-02-01 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a0faabf  [SPARK-26714][CORE][WEBUI] Show 0 partition job in WebUI
a0faabf is described below

commit a0faabf7b5901866945a5b4c9ae973de266ed887
Author: xiaodeshan 
AuthorDate: Fri Feb 1 18:38:27 2019 -0600

[SPARK-26714][CORE][WEBUI] Show 0 partition job in WebUI

## What changes were proposed in this pull request?

When the job's partiton is zero, it will still get a jobid but not shown in 
ui. It's strange. This PR is to show this job in ui.

Example:
In bash:
mkdir -p /home/test/testdir

sc.textFile("/home/test/testdir")

Some logs:

```
19/01/24 17:26:19 INFO FileInputFormat: Total input paths to process : 0
19/01/24 17:26:19 INFO SparkContext: Starting job: collect at 
WordCount.scala:9
19/01/24 17:26:19 INFO DAGScheduler: Job 0 finished: collect at 
WordCount.scala:9, took 0.003735 s
```

## How was this patch tested?

UT

Closes #23637 from deshanxiao/spark-26714.

Authored-by: xiaodeshan 
Signed-off-by: Sean Owen 
---
 .../src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 5 +
 .../main/scala/org/apache/spark/status/AppStatusListener.scala   | 4 ++--
 core/src/main/scala/org/apache/spark/ui/UIUtils.scala| 3 ++-
 core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 9 +++--
 4 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 75eb37c..dd1b259 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -693,6 +693,11 @@ private[spark] class DAGScheduler(
 
 val jobId = nextJobId.getAndIncrement()
 if (partitions.size == 0) {
+  val time = clock.getTimeMillis()
+  listenerBus.post(
+SparkListenerJobStart(jobId, time, Seq[StageInfo](), properties))
+  listenerBus.post(
+SparkListenerJobEnd(jobId, time, JobSucceeded))
   // Return immediately if the job is running 0 tasks
   return new JobWaiter[U](this, jobId, 0, resultHandler)
 }
diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 3089f05..a8b2153 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -321,7 +321,7 @@ private[spark] class AppStatusListener(
 }
 
 val lastStageInfo = event.stageInfos.sortBy(_.stageId).lastOption
-val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage 
Name)")
+val jobName = lastStageInfo.map(_.name).getOrElse("")
 val jobGroup = Option(event.properties)
   .flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) }
 val sqlExecutionId = Option(event.properties)
@@ -329,7 +329,7 @@ private[spark] class AppStatusListener(
 
 val job = new LiveJob(
   event.jobId,
-  lastStageName,
+  jobName,
   if (event.time > 0) Some(new Date(event.time)) else None,
   event.stageIds,
   jobGroup,
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 9674350..54f2750 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -363,7 +363,8 @@ private[spark] object UIUtils extends Logging {
   skipped: Int,
   reasonToNumKilled: Map[String, Int],
   total: Int): Seq[Node] = {
-val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
+val ratio = if (total == 0) 100.0 else (completed.toDouble/total)*100
+val completeWidth = "width: %s%%".format(ratio)
 // started + completed can be > total when there are speculative tasks
 val boundedStarted = math.min(started, total - completed)
 val startWidth = "width: %s%%".format((boundedStarted.toDouble/total)*100)
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 2c94853..e399f7e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -857,8 +857,13 @@ private[spark] object ApiHelper {
   }
 
   def lastStageNameAndDescription(store: AppStatusStore, job: JobData): 
(String, String) = {
-val stage = store.asOption(store.stageAttempt(job.stageIds.max, 0)._1)
-(stage.map(_.name).getOrElse(""), 

[spark] branch master updated: [MINOR][DOC] Writing to partitioned Hive metastore Parquet tables is not supported for Spark SQL

2019-02-01 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 421ff6f  [MINOR][DOC] Writing to partitioned Hive metastore Parquet 
tables is not supported for Spark SQL
421ff6f is described below

commit 421ff6f60e2f3da123fd941d9fa91d7228b21ebc
Author: liuxian 
AuthorDate: Fri Feb 1 18:34:13 2019 -0600

[MINOR][DOC] Writing to partitioned Hive metastore Parquet tables is not 
supported for Spark SQL

## What changes were proposed in this pull request?

Even if `spark.sql.hive.convertMetastoreParquet` is true,  when writing to 
partitioned Hive metastore
Parquet tables,  Spark SQL still  can not use its own Parquet support 
instead of Hive SerDe.

Related code:
 
https://github.com/apache/spark/blob/d53e11ffce3f721886918c1cb4525478971f02bc/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala#L198
## How was this patch tested?
N/A

Closes #23671 from 10110346/parquetdoc.

Authored-by: liuxian 
Signed-off-by: Sean Owen 
---
 docs/sql-data-sources-parquet.md | 7 ---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/docs/sql-data-sources-parquet.md b/docs/sql-data-sources-parquet.md
index 5532bf9..f6e03fba 100644
--- a/docs/sql-data-sources-parquet.md
+++ b/docs/sql-data-sources-parquet.md
@@ -157,9 +157,10 @@ turned it off by default starting from 1.5.0. You may 
enable it by
 
 ### Hive metastore Parquet table conversion
 
-When reading from and writing to Hive metastore Parquet tables, Spark SQL will 
try to use its own
-Parquet support instead of Hive SerDe for better performance. This behavior is 
controlled by the
-`spark.sql.hive.convertMetastoreParquet` configuration, and is turned on by 
default.
+When reading from Hive metastore Parquet tables and writing to non-partitioned 
Hive metastore
+Parquet tables, Spark SQL will try to use its own Parquet support instead of 
Hive SerDe for
+better performance. This behavior is controlled by the 
`spark.sql.hive.convertMetastoreParquet`
+configuration, and is turned on by default.
 
  Hive/Parquet Schema Reconciliation
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-26771][CORE][GRAPHX] Make .unpersist(), .destroy() consistently non-blocking by default

2019-02-01 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8171b156 [SPARK-26771][CORE][GRAPHX] Make .unpersist(), .destroy() 
consistently non-blocking by default
8171b156 is described below

commit 8171b156ebf25a582318c71222716b5c52c8bbb3
Author: Sean Owen 
AuthorDate: Fri Feb 1 18:29:55 2019 -0600

[SPARK-26771][CORE][GRAPHX] Make .unpersist(), .destroy() consistently 
non-blocking by default

## What changes were proposed in this pull request?

Make .unpersist(), .destroy() non-blocking by default and adjust callers to 
request blocking only where important.

This also adds an optional blocking argument to Pyspark's RDD.unpersist(), 
which never had one.

## How was this patch tested?

Existing tests.

Closes #23685 from srowen/SPARK-26771.

Authored-by: Sean Owen 
Signed-off-by: Sean Owen 
---
 .../main/scala/org/apache/spark/MapOutputTracker.scala |  2 +-
 .../src/main/scala/org/apache/spark/SparkContext.scala |  2 +-
 .../scala/org/apache/spark/broadcast/Broadcast.scala   |  3 +--
 core/src/main/scala/org/apache/spark/rdd/RDD.scala |  6 +++---
 .../spark/rdd/util/PeriodicRDDCheckpointer.scala   |  2 +-
 .../test/scala/org/apache/spark/DistributedSuite.scala |  2 +-
 .../test/scala/org/apache/spark/UnpersistSuite.scala   |  2 +-
 .../org/apache/spark/broadcast/BroadcastSuite.scala|  4 ++--
 .../scala/org/apache/spark/ui/UISeleniumSuite.scala|  4 ++--
 docs/graphx-programming-guide.md   |  2 +-
 docs/rdd-programming-guide.md  |  9 -
 .../spark/examples/mllib/BinaryClassification.scala|  2 +-
 .../spark/examples/mllib/DecisionTreeRunner.scala  |  2 +-
 .../apache/spark/examples/mllib/LinearRegression.scala |  2 +-
 .../org/apache/spark/examples/mllib/MovieLensALS.scala |  2 +-
 .../src/main/scala/org/apache/spark/graphx/Graph.scala |  8 ++--
 .../main/scala/org/apache/spark/graphx/Pregel.scala|  6 +++---
 .../org/apache/spark/graphx/impl/EdgeRDDImpl.scala |  2 +-
 .../scala/org/apache/spark/graphx/impl/GraphImpl.scala |  4 ++--
 .../org/apache/spark/graphx/impl/VertexRDDImpl.scala   |  2 +-
 .../scala/org/apache/spark/graphx/lib/PageRank.scala   | 10 +-
 .../spark/graphx/lib/StronglyConnectedComponents.scala |  2 +-
 .../spark/graphx/util/PeriodicGraphCheckpointer.scala  |  2 +-
 .../scala/org/apache/spark/graphx/GraphSuite.scala | 10 +-
 .../org/apache/spark/ml/classification/LinearSVC.scala |  2 +-
 .../spark/ml/classification/LogisticRegression.scala   |  2 +-
 .../apache/spark/ml/clustering/GaussianMixture.scala   |  6 +++---
 .../apache/spark/ml/optim/loss/RDDLossFunction.scala   |  2 +-
 .../spark/ml/regression/AFTSurvivalRegression.scala|  4 ++--
 .../apache/spark/ml/regression/LinearRegression.scala  |  4 ++--
 .../spark/ml/tree/impl/GradientBoostedTrees.scala  |  2 +-
 .../org/apache/spark/ml/tree/impl/NodeIdCache.scala|  6 +++---
 .../apache/spark/mllib/api/python/PythonMLLibAPI.scala | 16 
 .../spark/mllib/clustering/BisectingKMeans.scala   |  8 
 .../spark/mllib/clustering/GaussianMixture.scala   |  2 +-
 .../org/apache/spark/mllib/clustering/KMeans.scala | 12 ++--
 .../apache/spark/mllib/clustering/KMeansModel.scala|  2 +-
 .../org/apache/spark/mllib/clustering/LDAModel.scala   |  2 +-
 .../apache/spark/mllib/clustering/LDAOptimizer.scala   |  2 +-
 .../org/apache/spark/mllib/feature/Word2Vec.scala  | 10 +-
 .../scala/org/apache/spark/mllib/fpm/PrefixSpan.scala  |  2 +-
 .../spark/mllib/optimization/GradientDescent.scala |  2 +-
 .../org/apache/spark/mllib/optimization/LBFGS.scala|  2 +-
 .../mllib/regression/GeneralizedLinearAlgorithm.scala  |  2 +-
 .../spark/mllib/tree/model/treeEnsembleModels.scala|  2 +-
 .../spark/ml/classification/LinearSVCSuite.scala   |  4 ++--
 .../ml/classification/LogisticRegressionSuite.scala|  6 +++---
 python/pyspark/broadcast.py| 11 +++
 python/pyspark/rdd.py  |  8 ++--
 python/pyspark/tests/test_rdd.py   |  2 +-
 .../org/apache/spark/sql/DataFrameFunctionsSuite.scala |  2 +-
 .../scala/org/apache/spark/sql/DatasetCacheSuite.scala | 18 +-
 .../columnar/InMemoryColumnarQuerySuite.scala  |  6 +++---
 .../org/apache/spark/sql/hive/CachedTableSuite.scala   |  2 +-
 .../org/apache/spark/streaming/dstream/DStream.scala   |  2 +-
 55 files changed, 131 insertions(+), 114 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index a8b8e96..803703f 100644
--- 

[spark] branch master updated: [SPARK-26754][PYTHON] Add hasTrainingSummary to replace duplicate code in PySpark

2019-02-01 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5bb9647  [SPARK-26754][PYTHON] Add hasTrainingSummary to replace 
duplicate code in PySpark
5bb9647 is described below

commit 5bb9647e1019ea7eb17af7d2057fdacb7f4c560b
Author: Huaxin Gao 
AuthorDate: Fri Feb 1 17:29:58 2019 -0600

[SPARK-26754][PYTHON] Add hasTrainingSummary to replace duplicate code in 
PySpark

## What changes were proposed in this pull request?

Python version of https://github.com/apache/spark/pull/17654

## How was this patch tested?

Existing Python unit test

Closes #23676 from huaxingao/spark26754.

Authored-by: Huaxin Gao 
Signed-off-by: Sean Owen 
---
 python/pyspark/ml/classification.py | 19 ++-
 python/pyspark/ml/clustering.py | 37 ++---
 python/pyspark/ml/regression.py | 30 ++
 python/pyspark/ml/util.py   | 26 ++
 4 files changed, 44 insertions(+), 68 deletions(-)

diff --git a/python/pyspark/ml/classification.py 
b/python/pyspark/ml/classification.py
index 89b9278..134b9e0 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -483,7 +483,8 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, 
HasLabelCol, HasPredicti
 return self.getOrDefault(self.upperBoundsOnIntercepts)
 
 
-class LogisticRegressionModel(JavaModel, JavaClassificationModel, 
JavaMLWritable, JavaMLReadable):
+class LogisticRegressionModel(JavaModel, JavaClassificationModel, 
JavaMLWritable, JavaMLReadable,
+  HasTrainingSummary):
 """
 Model fitted by LogisticRegression.
 
@@ -532,24 +533,16 @@ class LogisticRegressionModel(JavaModel, 
JavaClassificationModel, JavaMLWritable
 trained on the training set. An exception is thrown if 
`trainingSummary is None`.
 """
 if self.hasSummary:
-java_lrt_summary = self._call_java("summary")
 if self.numClasses <= 2:
-return 
BinaryLogisticRegressionTrainingSummary(java_lrt_summary)
+return 
BinaryLogisticRegressionTrainingSummary(super(LogisticRegressionModel,
+ 
self).summary)
 else:
-return LogisticRegressionTrainingSummary(java_lrt_summary)
+return 
LogisticRegressionTrainingSummary(super(LogisticRegressionModel,
+   self).summary)
 else:
 raise RuntimeError("No training summary available for this %s" %
self.__class__.__name__)
 
-@property
-@since("2.0.0")
-def hasSummary(self):
-"""
-Indicates whether a training summary exists for this model
-instance.
-"""
-return self._call_java("hasSummary")
-
 @since("2.0.0")
 def evaluate(self, dataset):
 """
diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py
index b9c6bdf..864e2a3 100644
--- a/python/pyspark/ml/clustering.py
+++ b/python/pyspark/ml/clustering.py
@@ -97,7 +97,7 @@ class ClusteringSummary(JavaWrapper):
 return self._call_java("numIter")
 
 
-class GaussianMixtureModel(JavaModel, JavaMLWritable, JavaMLReadable):
+class GaussianMixtureModel(JavaModel, JavaMLWritable, JavaMLReadable, 
HasTrainingSummary):
 """
 Model fitted by GaussianMixture.
 
@@ -126,22 +126,13 @@ class GaussianMixtureModel(JavaModel, JavaMLWritable, 
JavaMLReadable):
 
 @property
 @since("2.1.0")
-def hasSummary(self):
-"""
-Indicates whether a training summary exists for this model
-instance.
-"""
-return self._call_java("hasSummary")
-
-@property
-@since("2.1.0")
 def summary(self):
 """
 Gets summary (e.g. cluster assignments, cluster sizes) of the model 
trained on the
 training set. An exception is thrown if no summary exists.
 """
 if self.hasSummary:
-return GaussianMixtureSummary(self._call_java("summary"))
+return GaussianMixtureSummary(super(GaussianMixtureModel, 
self).summary)
 else:
 raise RuntimeError("No training summary available for this %s" %
self.__class__.__name__)
@@ -323,7 +314,7 @@ class KMeansSummary(ClusteringSummary):
 return self._call_java("trainingCost")
 
 
-class KMeansModel(JavaModel, GeneralJavaMLWritable, JavaMLReadable):
+class KMeansModel(JavaModel, GeneralJavaMLWritable, JavaMLReadable, 
HasTrainingSummary):
 """
 Model fitted by KMeans.
 
@@ -337,21 +328,13 @@ class KMeansModel(JavaModel, 

[spark] branch test-branch deleted (was 0f8b07e)

2019-02-01 Thread rxin
This is an automated email from the ASF dual-hosted git repository.

rxin pushed a change to branch test-branch
in repository https://gitbox.apache.org/repos/asf/spark.git.


 was 0f8b07e  test

This change permanently discards the following revisions:

 discard 0f8b07e  test


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch test-branch created (now 0f8b07e)

2019-02-01 Thread rxin
This is an automated email from the ASF dual-hosted git repository.

rxin pushed a change to branch test-branch
in repository https://gitbox.apache.org/repos/asf/spark.git.


  at 0f8b07e  test

This branch includes the following new commits:

 new 0f8b07e  test

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] 01/01: test

2019-02-01 Thread rxin
This is an automated email from the ASF dual-hosted git repository.

rxin pushed a commit to branch test-branch
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 0f8b07e5034af2819b75b53aadffda82ae0c31b8
Author: Reynold Xin 
AuthorDate: Fri Feb 1 13:28:18 2019 -0800

test
---
 README.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/README.md b/README.md
index 271f2f5..2c1e02a 100644
--- a/README.md
+++ b/README.md
@@ -39,7 +39,7 @@ For general development tips, including info on developing 
Spark using an IDE, s
 
 The easiest way to start using Spark is through the Scala shell:
 
-./bin/spark-shell
+./bin/spark-shella
 
 Try the following command, which should return 1000:
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.2 updated: [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly

2019-02-01 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
 new 7c7d7f6  [SPARK-26806][SS] EventTimeStats.merge should handle zeros 
correctly
7c7d7f6 is described below

commit 7c7d7f6a878b02ece881266ee538f3e1443aa8c1
Author: Shixiong Zhu 
AuthorDate: Fri Feb 1 11:15:05 2019 -0800

[SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly

## What changes were proposed in this pull request?

Right now, EventTimeStats.merge doesn't handle `zero.merge(zero)` 
correctly. This will make `avg` become `NaN`. And whatever gets merged with the 
result of `zero.merge(zero)`, `avg` will still be `NaN`. Then finally, we call 
`NaN.toLong` and get `0`, and the user will see the following incorrect report:

```
"eventTime" : {
"avg" : "1970-01-01T00:00:00.000Z",
"max" : "2019-01-31T12:57:00.000Z",
"min" : "2019-01-30T18:44:04.000Z",
"watermark" : "1970-01-01T00:00:00.000Z"
  }
```

This issue was reported by liancheng .

This PR fixes the above issue.

## How was this patch tested?

The new unit tests.

Closes #23718 from zsxwing/merge-zero.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit 03a928cbecaf38bbbab3e6b957fcbb542771cfbd)
Signed-off-by: Shixiong Zhu 
---
 .../streaming/EventTimeWatermarkExec.scala | 17 +---
 .../sql/streaming/EventTimeWatermarkSuite.scala| 32 --
 2 files changed, 42 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index 55e7508..4069633 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -36,10 +36,19 @@ case class EventTimeStats(var max: Long, var min: Long, var 
avg: Double, var cou
   }
 
   def merge(that: EventTimeStats): Unit = {
-this.max = math.max(this.max, that.max)
-this.min = math.min(this.min, that.min)
-this.count += that.count
-this.avg += (that.avg - this.avg) * that.count / this.count
+if (that.count == 0) {
+  // no-op
+} else if (this.count == 0) {
+  this.max = that.max
+  this.min = that.min
+  this.count = that.count
+  this.avg = that.avg
+} else {
+  this.max = math.max(this.max, that.max)
+  this.min = math.min(this.min, that.min)
+  this.count += that.count
+  this.avg += (that.avg - this.avg) * that.count / this.count
+}
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 4f19fa0..14a193f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -38,9 +38,9 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 sqlContext.streams.active.foreach(_.stop())
   }
 
-  test("EventTimeStats") {
-val epsilon = 10E-6
+  private val epsilon = 10E-6
 
+  test("EventTimeStats") {
 val stats = EventTimeStats(max = 100, min = 10, avg = 20.0, count = 5)
 stats.add(80L)
 stats.max should be (100)
@@ -57,7 +57,6 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
   }
 
   test("EventTimeStats: avg on large values") {
-val epsilon = 10E-6
 val largeValue = 100L // 10B
 // Make sure `largeValue` will cause overflow if we use a Long sum to calc 
avg.
 assert(largeValue * largeValue != BigInt(largeValue) * BigInt(largeValue))
@@ -75,6 +74,33 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 stats.avg should be ((largeValue + 0.5) +- epsilon)
   }
 
+  test("EventTimeStats: zero merge zero") {
+val stats = EventTimeStats.zero
+val stats2 = EventTimeStats.zero
+stats.merge(stats2)
+stats should be (EventTimeStats.zero)
+  }
+
+  test("EventTimeStats: non-zero merge zero") {
+val stats = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3)
+val stats2 = EventTimeStats.zero
+stats.merge(stats2)
+stats.max should be (10L)
+stats.min should be (1L)
+stats.avg should be (5.0 +- epsilon)
+stats.count should be (3L)
+  }
+
+  test("EventTimeStats: zero merge non-zero") {
+val stats = EventTimeStats.zero
+val stats2 = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3)
+stats.merge(stats2)
+ 

[spark] branch branch-2.3 updated: [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly

2019-02-01 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
 new a5d22da  [SPARK-26806][SS] EventTimeStats.merge should handle zeros 
correctly
a5d22da is described below

commit a5d22da1888b8110b490d52d2c36b3fc907254f6
Author: Shixiong Zhu 
AuthorDate: Fri Feb 1 11:15:05 2019 -0800

[SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly

## What changes were proposed in this pull request?

Right now, EventTimeStats.merge doesn't handle `zero.merge(zero)` 
correctly. This will make `avg` become `NaN`. And whatever gets merged with the 
result of `zero.merge(zero)`, `avg` will still be `NaN`. Then finally, we call 
`NaN.toLong` and get `0`, and the user will see the following incorrect report:

```
"eventTime" : {
"avg" : "1970-01-01T00:00:00.000Z",
"max" : "2019-01-31T12:57:00.000Z",
"min" : "2019-01-30T18:44:04.000Z",
"watermark" : "1970-01-01T00:00:00.000Z"
  }
```

This issue was reported by liancheng .

This PR fixes the above issue.

## How was this patch tested?

The new unit tests.

Closes #23718 from zsxwing/merge-zero.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit 03a928cbecaf38bbbab3e6b957fcbb542771cfbd)
Signed-off-by: Shixiong Zhu 
---
 .../streaming/EventTimeWatermarkExec.scala | 17 +---
 .../sql/streaming/EventTimeWatermarkSuite.scala| 32 --
 2 files changed, 42 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index b161651..6fa7ee0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -36,10 +36,19 @@ case class EventTimeStats(var max: Long, var min: Long, var 
avg: Double, var cou
   }
 
   def merge(that: EventTimeStats): Unit = {
-this.max = math.max(this.max, that.max)
-this.min = math.min(this.min, that.min)
-this.count += that.count
-this.avg += (that.avg - this.avg) * that.count / this.count
+if (that.count == 0) {
+  // no-op
+} else if (this.count == 0) {
+  this.max = that.max
+  this.min = that.min
+  this.count = that.count
+  this.avg = that.avg
+} else {
+  this.max = math.max(this.max, that.max)
+  this.min = math.min(this.min, that.min)
+  this.count += that.count
+  this.avg += (that.avg - this.avg) * that.count / this.count
+}
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index d6bef9c..a51f086 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -38,9 +38,9 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 sqlContext.streams.active.foreach(_.stop())
   }
 
-  test("EventTimeStats") {
-val epsilon = 10E-6
+  private val epsilon = 10E-6
 
+  test("EventTimeStats") {
 val stats = EventTimeStats(max = 100, min = 10, avg = 20.0, count = 5)
 stats.add(80L)
 stats.max should be (100)
@@ -57,7 +57,6 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
   }
 
   test("EventTimeStats: avg on large values") {
-val epsilon = 10E-6
 val largeValue = 100L // 10B
 // Make sure `largeValue` will cause overflow if we use a Long sum to calc 
avg.
 assert(largeValue * largeValue != BigInt(largeValue) * BigInt(largeValue))
@@ -75,6 +74,33 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 stats.avg should be ((largeValue + 0.5) +- epsilon)
   }
 
+  test("EventTimeStats: zero merge zero") {
+val stats = EventTimeStats.zero
+val stats2 = EventTimeStats.zero
+stats.merge(stats2)
+stats should be (EventTimeStats.zero)
+  }
+
+  test("EventTimeStats: non-zero merge zero") {
+val stats = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3)
+val stats2 = EventTimeStats.zero
+stats.merge(stats2)
+stats.max should be (10L)
+stats.min should be (1L)
+stats.avg should be (5.0 +- epsilon)
+stats.count should be (3L)
+  }
+
+  test("EventTimeStats: zero merge non-zero") {
+val stats = EventTimeStats.zero
+val stats2 = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3)
+stats.merge(stats2)
+ 

[spark] branch branch-2.4 updated: [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly

2019-02-01 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new bd4ce51  [SPARK-26806][SS] EventTimeStats.merge should handle zeros 
correctly
bd4ce51 is described below

commit bd4ce51e699da306bc36db0c7b0303b6e6c3d4df
Author: Shixiong Zhu 
AuthorDate: Fri Feb 1 11:15:05 2019 -0800

[SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly

## What changes were proposed in this pull request?

Right now, EventTimeStats.merge doesn't handle `zero.merge(zero)` 
correctly. This will make `avg` become `NaN`. And whatever gets merged with the 
result of `zero.merge(zero)`, `avg` will still be `NaN`. Then finally, we call 
`NaN.toLong` and get `0`, and the user will see the following incorrect report:

```
"eventTime" : {
"avg" : "1970-01-01T00:00:00.000Z",
"max" : "2019-01-31T12:57:00.000Z",
"min" : "2019-01-30T18:44:04.000Z",
"watermark" : "1970-01-01T00:00:00.000Z"
  }
```

This issue was reported by liancheng .

This PR fixes the above issue.

## How was this patch tested?

The new unit tests.

Closes #23718 from zsxwing/merge-zero.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit 03a928cbecaf38bbbab3e6b957fcbb542771cfbd)
Signed-off-by: Shixiong Zhu 
---
 .../streaming/EventTimeWatermarkExec.scala | 17 +---
 .../sql/streaming/EventTimeWatermarkSuite.scala| 32 --
 2 files changed, 42 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index b161651..6fa7ee0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -36,10 +36,19 @@ case class EventTimeStats(var max: Long, var min: Long, var 
avg: Double, var cou
   }
 
   def merge(that: EventTimeStats): Unit = {
-this.max = math.max(this.max, that.max)
-this.min = math.min(this.min, that.min)
-this.count += that.count
-this.avg += (that.avg - this.avg) * that.count / this.count
+if (that.count == 0) {
+  // no-op
+} else if (this.count == 0) {
+  this.max = that.max
+  this.min = that.min
+  this.count = that.count
+  this.avg = that.avg
+} else {
+  this.max = math.max(this.max, that.max)
+  this.min = math.min(this.min, that.min)
+  this.count += that.count
+  this.avg += (that.avg - this.avg) * that.count / this.count
+}
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 026af17..091b9a1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -43,9 +43,9 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 sqlContext.streams.active.foreach(_.stop())
   }
 
-  test("EventTimeStats") {
-val epsilon = 10E-6
+  private val epsilon = 10E-6
 
+  test("EventTimeStats") {
 val stats = EventTimeStats(max = 100, min = 10, avg = 20.0, count = 5)
 stats.add(80L)
 stats.max should be (100)
@@ -62,7 +62,6 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
   }
 
   test("EventTimeStats: avg on large values") {
-val epsilon = 10E-6
 val largeValue = 100L // 10B
 // Make sure `largeValue` will cause overflow if we use a Long sum to calc 
avg.
 assert(largeValue * largeValue != BigInt(largeValue) * BigInt(largeValue))
@@ -80,6 +79,33 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 stats.avg should be ((largeValue + 0.5) +- epsilon)
   }
 
+  test("EventTimeStats: zero merge zero") {
+val stats = EventTimeStats.zero
+val stats2 = EventTimeStats.zero
+stats.merge(stats2)
+stats should be (EventTimeStats.zero)
+  }
+
+  test("EventTimeStats: non-zero merge zero") {
+val stats = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3)
+val stats2 = EventTimeStats.zero
+stats.merge(stats2)
+stats.max should be (10L)
+stats.min should be (1L)
+stats.avg should be (5.0 +- epsilon)
+stats.count should be (3L)
+  }
+
+  test("EventTimeStats: zero merge non-zero") {
+val stats = EventTimeStats.zero
+val stats2 = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3)
+stats.merge(stats2)
+ 

[spark] branch master updated: [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly

2019-02-01 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 03a928c  [SPARK-26806][SS] EventTimeStats.merge should handle zeros 
correctly
03a928c is described below

commit 03a928cbecaf38bbbab3e6b957fcbb542771cfbd
Author: Shixiong Zhu 
AuthorDate: Fri Feb 1 11:15:05 2019 -0800

[SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly

## What changes were proposed in this pull request?

Right now, EventTimeStats.merge doesn't handle `zero.merge(zero)` 
correctly. This will make `avg` become `NaN`. And whatever gets merged with the 
result of `zero.merge(zero)`, `avg` will still be `NaN`. Then finally, we call 
`NaN.toLong` and get `0`, and the user will see the following incorrect report:

```
"eventTime" : {
"avg" : "1970-01-01T00:00:00.000Z",
"max" : "2019-01-31T12:57:00.000Z",
"min" : "2019-01-30T18:44:04.000Z",
"watermark" : "1970-01-01T00:00:00.000Z"
  }
```

This issue was reported by liancheng .

This PR fixes the above issue.

## How was this patch tested?

The new unit tests.

Closes #23718 from zsxwing/merge-zero.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
---
 .../streaming/EventTimeWatermarkExec.scala | 17 +---
 .../sql/streaming/EventTimeWatermarkSuite.scala| 32 --
 2 files changed, 42 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index b161651..6fa7ee0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -36,10 +36,19 @@ case class EventTimeStats(var max: Long, var min: Long, var 
avg: Double, var cou
   }
 
   def merge(that: EventTimeStats): Unit = {
-this.max = math.max(this.max, that.max)
-this.min = math.min(this.min, that.min)
-this.count += that.count
-this.avg += (that.avg - this.avg) * that.count / this.count
+if (that.count == 0) {
+  // no-op
+} else if (this.count == 0) {
+  this.max = that.max
+  this.min = that.min
+  this.count = that.count
+  this.avg = that.avg
+} else {
+  this.max = math.max(this.max, that.max)
+  this.min = math.min(this.min, that.min)
+  this.count += that.count
+  this.avg += (that.avg - this.avg) * that.count / this.count
+}
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index c696204..b79770a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -43,9 +43,9 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 sqlContext.streams.active.foreach(_.stop())
   }
 
-  test("EventTimeStats") {
-val epsilon = 10E-6
+  private val epsilon = 10E-6
 
+  test("EventTimeStats") {
 val stats = EventTimeStats(max = 100, min = 10, avg = 20.0, count = 5)
 stats.add(80L)
 stats.max should be (100)
@@ -62,7 +62,6 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
   }
 
   test("EventTimeStats: avg on large values") {
-val epsilon = 10E-6
 val largeValue = 100L // 10B
 // Make sure `largeValue` will cause overflow if we use a Long sum to calc 
avg.
 assert(largeValue * largeValue != BigInt(largeValue) * BigInt(largeValue))
@@ -80,6 +79,33 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 stats.avg should be ((largeValue + 0.5) +- epsilon)
   }
 
+  test("EventTimeStats: zero merge zero") {
+val stats = EventTimeStats.zero
+val stats2 = EventTimeStats.zero
+stats.merge(stats2)
+stats should be (EventTimeStats.zero)
+  }
+
+  test("EventTimeStats: non-zero merge zero") {
+val stats = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3)
+val stats2 = EventTimeStats.zero
+stats.merge(stats2)
+stats.max should be (10L)
+stats.min should be (1L)
+stats.avg should be (5.0 +- epsilon)
+stats.count should be (3L)
+  }
+
+  test("EventTimeStats: zero merge non-zero") {
+val stats = EventTimeStats.zero
+val stats2 = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3)
+stats.merge(stats2)
+stats.max should be (10L)
+stats.min should be (1L)
+stats.avg should be (5.0 +- epsilon)
+