spark git commit: [SPARK-22060][ML] Fix CrossValidator/TrainValidationSplit param persist/load bug

2017-09-22 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 3e6a714c9 -> f180b6534


[SPARK-22060][ML] Fix CrossValidator/TrainValidationSplit param persist/load bug

## What changes were proposed in this pull request?

Currently the param of CrossValidator/TrainValidationSplit persist/loading is 
hardcoding, which is different with other ML estimators. This cause persist bug 
for new added `parallelism` param.

I refactor related code, avoid hardcoding persist/load param. And in the same 
time, it solve the `parallelism` persisting bug.

This refactoring is very useful because we will add more new params in #19208 , 
hardcoding param persisting/loading making the thing adding new params very 
troublesome.

## How was this patch tested?

Test added.

Author: WeichenXu 

Closes #19278 from WeichenXu123/fix-tuning-param-bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f180b653
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f180b653
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f180b653

Branch: refs/heads/master
Commit: f180b65343e706c60b995a3d46d0391612bda966
Parents: 3e6a714
Author: WeichenXu 
Authored: Fri Sep 22 18:15:01 2017 -0700
Committer: Joseph K. Bradley 
Committed: Fri Sep 22 18:15:01 2017 -0700

--
 .../apache/spark/ml/tuning/CrossValidator.scala | 17 +++
 .../spark/ml/tuning/TrainValidationSplit.scala  | 18 
 .../spark/ml/tuning/ValidatorParams.scala   | 22 +++-
 .../org/apache/spark/ml/util/ReadWrite.scala| 20 +-
 .../spark/ml/tuning/CrossValidatorSuite.scala   |  3 +++
 .../ml/tuning/TrainValidationSplitSuite.scala   |  4 +++-
 6 files changed, 46 insertions(+), 38 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f180b653/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala 
b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
index ce2a3a2..7c81cb9 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
@@ -212,14 +212,13 @@ object CrossValidator extends MLReadable[CrossValidator] {
 
   val (metadata, estimator, evaluator, estimatorParamMaps) =
 ValidatorParams.loadImpl(path, sc, className)
-  val numFolds = (metadata.params \ "numFolds").extract[Int]
-  val seed = (metadata.params \ "seed").extract[Long]
-  new CrossValidator(metadata.uid)
+  val cv = new CrossValidator(metadata.uid)
 .setEstimator(estimator)
 .setEvaluator(evaluator)
 .setEstimatorParamMaps(estimatorParamMaps)
-.setNumFolds(numFolds)
-.setSeed(seed)
+  DefaultParamsReader.getAndSetParams(cv, metadata,
+skipParams = Option(List("estimatorParamMaps")))
+  cv
 }
   }
 }
@@ -302,17 +301,17 @@ object CrossValidatorModel extends 
MLReadable[CrossValidatorModel] {
 
   val (metadata, estimator, evaluator, estimatorParamMaps) =
 ValidatorParams.loadImpl(path, sc, className)
-  val numFolds = (metadata.params \ "numFolds").extract[Int]
-  val seed = (metadata.params \ "seed").extract[Long]
   val bestModelPath = new Path(path, "bestModel").toString
   val bestModel = 
DefaultParamsReader.loadParamsInstance[Model[_]](bestModelPath, sc)
   val avgMetrics = (metadata.metadata \ 
"avgMetrics").extract[Seq[Double]].toArray
+
   val model = new CrossValidatorModel(metadata.uid, bestModel, avgMetrics)
   model.set(model.estimator, estimator)
 .set(model.evaluator, evaluator)
 .set(model.estimatorParamMaps, estimatorParamMaps)
-.set(model.numFolds, numFolds)
-.set(model.seed, seed)
+  DefaultParamsReader.getAndSetParams(model, metadata,
+skipParams = Option(List("estimatorParamMaps")))
+  model
 }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f180b653/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala 
b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala
index 16db0f5..6e3ad40 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.ml.tuning
 
+import java.io.IOException
 import java.util.{List => JList}
 
 import 

[2/2] spark git commit: Preparing development version 2.1.3-SNAPSHOT

2017-09-22 Thread holden
Preparing development version 2.1.3-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03db7214
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03db7214
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03db7214

Branch: refs/heads/branch-2.1
Commit: 03db7214931a6a082154c9acf50d976687500670
Parents: fabbb7f
Author: Holden Karau 
Authored: Fri Sep 22 08:07:45 2017 -0700
Committer: Holden Karau 
Committed: Fri Sep 22 08:07:45 2017 -0700

--
 R/pkg/DESCRIPTION | 2 +-
 assembly/pom.xml  | 2 +-
 common/network-common/pom.xml | 2 +-
 common/network-shuffle/pom.xml| 2 +-
 common/network-yarn/pom.xml   | 2 +-
 common/sketch/pom.xml | 2 +-
 common/tags/pom.xml   | 2 +-
 common/unsafe/pom.xml | 2 +-
 core/pom.xml  | 2 +-
 docs/_config.yml  | 4 ++--
 examples/pom.xml  | 2 +-
 external/docker-integration-tests/pom.xml | 2 +-
 external/flume-assembly/pom.xml   | 2 +-
 external/flume-sink/pom.xml   | 2 +-
 external/flume/pom.xml| 2 +-
 external/java8-tests/pom.xml  | 2 +-
 external/kafka-0-10-assembly/pom.xml  | 2 +-
 external/kafka-0-10-sql/pom.xml   | 2 +-
 external/kafka-0-10/pom.xml   | 2 +-
 external/kafka-0-8-assembly/pom.xml   | 2 +-
 external/kafka-0-8/pom.xml| 2 +-
 external/kinesis-asl-assembly/pom.xml | 2 +-
 external/kinesis-asl/pom.xml  | 2 +-
 external/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml| 2 +-
 launcher/pom.xml  | 2 +-
 mesos/pom.xml | 2 +-
 mllib-local/pom.xml   | 2 +-
 mllib/pom.xml | 2 +-
 pom.xml   | 2 +-
 python/pyspark/version.py | 2 +-
 repl/pom.xml  | 2 +-
 sql/catalyst/pom.xml  | 2 +-
 sql/core/pom.xml  | 2 +-
 sql/hive-thriftserver/pom.xml | 2 +-
 sql/hive/pom.xml  | 2 +-
 streaming/pom.xml | 2 +-
 tools/pom.xml | 2 +-
 yarn/pom.xml  | 2 +-
 39 files changed, 40 insertions(+), 40 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/03db7214/R/pkg/DESCRIPTION
--
diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index 899d410..6c380b6 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -1,6 +1,6 @@
 Package: SparkR
 Type: Package
-Version: 2.1.2
+Version: 2.1.3
 Title: R Frontend for Apache Spark
 Description: Provides an R Frontend for Apache Spark.
 Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),

http://git-wip-us.apache.org/repos/asf/spark/blob/03db7214/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 133f8e6..e9f915a 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.2
+2.1.3-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/03db7214/common/network-common/pom.xml
--
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index d2631e4..7e203e7 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.2
+2.1.3-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/03db7214/common/network-shuffle/pom.xml
--
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index c12d480..92dd275 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.2
+2.1.3-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/03db7214/common/network-yarn/pom.xml
--
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index d22db36..abca418 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.2
+2.1.3-SNAPSHOT
 

[1/2] spark git commit: Preparing Spark release v2.1.2-rc2

2017-09-22 Thread holden
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 d930bbb40 -> 03db72149


Preparing Spark release v2.1.2-rc2


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fabbb7f5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fabbb7f5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fabbb7f5

Branch: refs/heads/branch-2.1
Commit: fabbb7f59e47590114366d14e15fbbff8c88593c
Parents: d930bbb
Author: Holden Karau 
Authored: Fri Sep 22 08:07:37 2017 -0700
Committer: Holden Karau 
Committed: Fri Sep 22 08:07:37 2017 -0700

--
 R/pkg/DESCRIPTION | 2 +-
 assembly/pom.xml  | 2 +-
 common/network-common/pom.xml | 2 +-
 common/network-shuffle/pom.xml| 2 +-
 common/network-yarn/pom.xml   | 2 +-
 common/sketch/pom.xml | 2 +-
 common/tags/pom.xml   | 2 +-
 common/unsafe/pom.xml | 2 +-
 core/pom.xml  | 2 +-
 docs/_config.yml  | 4 ++--
 examples/pom.xml  | 2 +-
 external/docker-integration-tests/pom.xml | 2 +-
 external/flume-assembly/pom.xml   | 2 +-
 external/flume-sink/pom.xml   | 2 +-
 external/flume/pom.xml| 2 +-
 external/java8-tests/pom.xml  | 2 +-
 external/kafka-0-10-assembly/pom.xml  | 2 +-
 external/kafka-0-10-sql/pom.xml   | 2 +-
 external/kafka-0-10/pom.xml   | 2 +-
 external/kafka-0-8-assembly/pom.xml   | 2 +-
 external/kafka-0-8/pom.xml| 2 +-
 external/kinesis-asl-assembly/pom.xml | 2 +-
 external/kinesis-asl/pom.xml  | 2 +-
 external/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml| 2 +-
 launcher/pom.xml  | 2 +-
 mesos/pom.xml | 2 +-
 mllib-local/pom.xml   | 2 +-
 mllib/pom.xml | 2 +-
 pom.xml   | 2 +-
 python/pyspark/version.py | 2 +-
 repl/pom.xml  | 2 +-
 sql/catalyst/pom.xml  | 2 +-
 sql/core/pom.xml  | 2 +-
 sql/hive-thriftserver/pom.xml | 2 +-
 sql/hive/pom.xml  | 2 +-
 streaming/pom.xml | 2 +-
 tools/pom.xml | 2 +-
 yarn/pom.xml  | 2 +-
 39 files changed, 40 insertions(+), 40 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fabbb7f5/R/pkg/DESCRIPTION
--
diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index 6c380b6..899d410 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -1,6 +1,6 @@
 Package: SparkR
 Type: Package
-Version: 2.1.3
+Version: 2.1.2
 Title: R Frontend for Apache Spark
 Description: Provides an R Frontend for Apache Spark.
 Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),

http://git-wip-us.apache.org/repos/asf/spark/blob/fabbb7f5/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index e9f915a..133f8e6 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.3-SNAPSHOT
+2.1.2
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fabbb7f5/common/network-common/pom.xml
--
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index 7e203e7..d2631e4 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.3-SNAPSHOT
+2.1.2
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fabbb7f5/common/network-shuffle/pom.xml
--
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index 92dd275..c12d480 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.3-SNAPSHOT
+2.1.2
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fabbb7f5/common/network-yarn/pom.xml
--
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index abca418..d22db36 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -22,7 +22,7 @@
   
 

[spark] Git Push Summary

2017-09-22 Thread holden
Repository: spark
Updated Tags:  refs/tags/v2.1.2-rc2 [created] fabbb7f59

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21766][PYSPARK][SQL] DataFrame toPandas() raises ValueError with nullable int columns

2017-09-22 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master d2b2932d8 -> 3e6a714c9


[SPARK-21766][PYSPARK][SQL] DataFrame toPandas() raises ValueError with 
nullable int columns

## What changes were proposed in this pull request?

When calling `DataFrame.toPandas()` (without Arrow enabled), if there is a 
`IntegralType` column (`IntegerType`, `ShortType`, `ByteType`) that has null 
values the following exception is thrown:

ValueError: Cannot convert non-finite values (NA or inf) to integer

This is because the null values first get converted to float NaN during the 
construction of the Pandas DataFrame in `from_records`, and then it is 
attempted to be converted back to to an integer where it fails.

The fix is going to check if the Pandas DataFrame can cause such failure when 
converting, if so, we don't do the conversion and use the inferred type by 
Pandas.

Closes #18945

## How was this patch tested?

Added pyspark test.

Author: Liang-Chi Hsieh 

Closes #19319 from viirya/SPARK-21766.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e6a714c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e6a714c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e6a714c

Branch: refs/heads/master
Commit: 3e6a714c9ee97ef13b3f2010babded3b63fd9d74
Parents: d2b2932
Author: Liang-Chi Hsieh 
Authored: Fri Sep 22 22:39:47 2017 +0900
Committer: hyukjinkwon 
Committed: Fri Sep 22 22:39:47 2017 +0900

--
 python/pyspark/sql/dataframe.py | 13 ++---
 python/pyspark/sql/tests.py | 12 
 2 files changed, 22 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3e6a714c/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 88ac413..7b81a0b 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -37,6 +37,7 @@ from pyspark.sql.types import _parse_datatype_json_string
 from pyspark.sql.column import Column, _to_seq, _to_list, _to_java_column
 from pyspark.sql.readwriter import DataFrameWriter
 from pyspark.sql.streaming import DataStreamWriter
+from pyspark.sql.types import IntegralType
 from pyspark.sql.types import *
 
 __all__ = ["DataFrame", "DataFrameNaFunctions", "DataFrameStatFunctions"]
@@ -1891,14 +1892,20 @@ class DataFrame(object):
   "if using spark.sql.execution.arrow.enable=true"
 raise ImportError("%s\n%s" % (e.message, msg))
 else:
+pdf = pd.DataFrame.from_records(self.collect(), 
columns=self.columns)
+
 dtype = {}
 for field in self.schema:
 pandas_type = _to_corrected_pandas_type(field.dataType)
-if pandas_type is not None:
+# SPARK-21766: if an integer field is nullable and has null 
values, it can be
+# inferred by pandas as float column. Once we convert the 
column with NaN back
+# to integer type e.g., np.int16, we will hit exception. So we 
use the inferred
+# float type, not the corrected type from the schema in this 
case.
+if pandas_type is not None and \
+not(isinstance(field.dataType, IntegralType) and 
field.nullable and
+pdf[field.name].isnull().any()):
 dtype[field.name] = pandas_type
 
-pdf = pd.DataFrame.from_records(self.collect(), 
columns=self.columns)
-
 for f, t in dtype.items():
 pdf[f] = pdf[f].astype(t, copy=False)
 return pdf

http://git-wip-us.apache.org/repos/asf/spark/blob/3e6a714c/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index ab76c48..3db8bee 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -2564,6 +2564,18 @@ class SQLTests(ReusedPySparkTestCase):
 self.assertEquals(types[2], np.bool)
 self.assertEquals(types[3], np.float32)
 
+@unittest.skipIf(not _have_pandas, "Pandas not installed")
+def test_to_pandas_avoid_astype(self):
+import numpy as np
+schema = StructType().add("a", IntegerType()).add("b", StringType())\
+ .add("c", IntegerType())
+data = [(1, "foo", 16777220), (None, "bar", None)]
+df = self.spark.createDataFrame(data, schema)
+types = df.toPandas().dtypes
+self.assertEquals(types[0], np.float64)  # doesn't convert to np.int32 
due to NaN value.
+self.assertEquals(types[1], np.object)
+

spark git commit: [SPARK-22092] Reallocation in OffHeapColumnVector.reserveInternal corrupts struct and array data

2017-09-22 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 10e37f6eb -> d2b2932d8


[SPARK-22092] Reallocation in OffHeapColumnVector.reserveInternal corrupts 
struct and array data

## What changes were proposed in this pull request?

`OffHeapColumnVector.reserveInternal()` will only copy already inserted values 
during reallocation if `data != null`. In vectors containing arrays or structs 
this is incorrect, since there field `data` is not used at all. We need to 
check `nulls` instead.

## How was this patch tested?

Adds new tests to `ColumnVectorSuite` that reproduce the errors.

Author: Ala Luszczak 

Closes #19308 from ala/vector-realloc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2b2932d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2b2932d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2b2932d

Branch: refs/heads/master
Commit: d2b2932d8be01dee31983121f6fffd16177bf48a
Parents: 10e37f6
Author: Ala Luszczak 
Authored: Fri Sep 22 15:31:43 2017 +0200
Committer: Herman van Hovell 
Committed: Fri Sep 22 15:31:43 2017 +0200

--
 .../vectorized/OffHeapColumnVector.java |  2 +-
 .../vectorized/ColumnVectorSuite.scala  | 26 
 2 files changed, 27 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d2b2932d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
--
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 3568275..e1d3685 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -515,7 +515,7 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   // Split out the slow path.
   @Override
   protected void reserveInternal(int newCapacity) {
-int oldCapacity = (this.data == 0L) ? 0 : capacity;
+int oldCapacity = (nulls == 0L) ? 0 : capacity;
 if (this.resultArray != null) {
   this.lengthData =
   Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 
4);

http://git-wip-us.apache.org/repos/asf/spark/blob/d2b2932d/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
index 998067a..f7b06c9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
@@ -198,4 +198,30 @@ class ColumnVectorSuite extends SparkFunSuite with 
BeforeAndAfterEach {
 assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(0, 
IntegerType) === 456)
 assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(1, 
DoubleType) === 5.67)
   }
+
+  test("[SPARK-22092] off-heap column vector reallocation corrupts array 
data") {
+val arrayType = ArrayType(IntegerType, true)
+testVector = new OffHeapColumnVector(8, arrayType)
+
+val data = testVector.arrayData()
+(0 until 8).foreach(i => data.putInt(i, i))
+(0 until 8).foreach(i => testVector.putArray(i, i, 1))
+
+// Increase vector's capacity and reallocate the data to new bigger 
buffers.
+testVector.reserve(16)
+
+// Check that none of the values got lost/overwritten.
+val array = new ColumnVector.Array(testVector)
+(0 until 8).foreach { i =>
+  assert(array.get(i, arrayType).asInstanceOf[ArrayData].toIntArray() === 
Array(i))
+}
+  }
+
+  test("[SPARK-22092] off-heap column vector reallocation corrupts struct 
nullability") {
+val structType = new StructType().add("int", IntegerType).add("double", 
DoubleType)
+testVector = new OffHeapColumnVector(8, structType)
+(0 until 8).foreach(i => if (i % 2 == 0) testVector.putNull(i) else 
testVector.putNotNull(i))
+testVector.reserve(16)
+(0 until 8).foreach(i => assert(testVector.isNullAt(i) == (i % 2 == 0)))
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [UI][STREAMING] Modify the title, 'Records' instead of 'Input Size'

2017-09-22 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 27fc536d9 -> 10e37f6eb


[UI][STREAMING] Modify the title, 'Records' instead of 'Input Size'

## What changes were proposed in this pull request?
Spark Streaming is processing data should be record, so the title should be 
'Records', and should not be 'Input Size'.

Fix before:
![1](https://user-images.githubusercontent.com/26266482/30099599-c64d4a8a-9318-11e7-8a8d-1ca99b409323.png)

Fix after:
![2](https://user-images.githubusercontent.com/26266482/30099609-cd4df7d0-9318-11e7-8a27-dbaec6797bb1.png)

## How was this patch tested?
manual tests

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: guoxiaolong 

Closes #19144 from guoxiaolongzte/streamingUI.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10e37f6e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10e37f6e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10e37f6e

Branch: refs/heads/master
Commit: 10e37f6eb6819c9233830c0d97e8fd1c713be0f1
Parents: 27fc536
Author: guoxiaolong 
Authored: Fri Sep 22 11:51:57 2017 +0100
Committer: Sean Owen 
Committed: Fri Sep 22 11:51:57 2017 +0100

--
 .../scala/org/apache/spark/streaming/ui/AllBatchesTable.scala| 2 +-
 .../test/scala/org/apache/spark/streaming/UISeleniumSuite.scala  | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/10e37f6e/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index 70b4bb4..f1070e9 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -25,7 +25,7 @@ private[ui] abstract class BatchTableBase(tableId: String, 
batchInterval: Long)
 
   protected def columns: Seq[Node] = {
 Batch Time
-  Input Size
+  Records
   Scheduling Delay
 {SparkUIUtils.tooltip("Time taken by Streaming scheduler to submit 
jobs of a batch", "top")}
   

http://git-wip-us.apache.org/repos/asf/spark/blob/10e37f6e/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index e7cec99..f2204a1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -121,11 +121,11 @@ class UISeleniumSuite
 h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of 
\\d+\\)")) should be (true)
 
 findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq 
should be {
-  List("Batch Time", "Input Size", "Scheduling Delay (?)", "Processing 
Time (?)",
+  List("Batch Time", "Records", "Scheduling Delay (?)", "Processing 
Time (?)",
 "Output Ops: Succeeded/Total", "Status")
 }
 findAll(cssSelector("""#completed-batches-table 
th""")).map(_.text).toSeq should be {
-  List("Batch Time", "Input Size", "Scheduling Delay (?)", "Processing 
Time (?)",
+  List("Batch Time", "Records", "Scheduling Delay (?)", "Processing 
Time (?)",
 "Total Delay (?)", "Output Ops: Succeeded/Total")
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21190][PYSPARK] Python Vectorized UDFs

2017-09-22 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 8f130ad40 -> 27fc536d9


[SPARK-21190][PYSPARK] Python Vectorized UDFs

This PR adds vectorized UDFs to the Python API

**Proposed API**
Introduce a flag to turn on vectorization for a defined UDF, for example:

```
pandas_udf(DoubleType())
def plus(a, b)
return a + b
```
or

```
plus = pandas_udf(lambda a, b: a + b, DoubleType())
```
Usage is the same as normal UDFs

0-parameter UDFs
pandas_udf functions can declare an optional `**kwargs` and when evaluated, 
will contain a key "size" that will give the required length of the output.  
For example:

```
pandas_udf(LongType())
def f0(**kwargs):
return pd.Series(1).repeat(kwargs["size"])

df.select(f0())
```

Added new unit tests in pyspark.sql that are enabled if pyarrow and Pandas are 
available.

- [x] Fix support for promoted types with null values
- [ ] Discuss 0-param UDF API (use of kwargs)
- [x] Add tests for chained UDFs
- [ ] Discuss behavior when pyarrow not installed / enabled
- [ ] Cleanup pydoc and add user docs

Author: Bryan Cutler 
Author: Takuya UESHIN 

Closes #18659 from BryanCutler/arrow-vectorized-udfs-SPARK-21404.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27fc536d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27fc536d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27fc536d

Branch: refs/heads/master
Commit: 27fc536d9a54eccef7d1cbbe2a6a008043d62ba4
Parents: 8f130ad
Author: Bryan Cutler 
Authored: Fri Sep 22 16:17:41 2017 +0800
Committer: Wenchen Fan 
Committed: Fri Sep 22 16:17:50 2017 +0800

--
 .../org/apache/spark/api/python/PythonRDD.scala |  22 ++-
 python/pyspark/serializers.py   |  65 +-
 python/pyspark/sql/functions.py |  49 +++--
 python/pyspark/sql/tests.py | 197 +++
 python/pyspark/sql/types.py |  27 +++
 python/pyspark/worker.py|  57 --
 .../execution/python/ArrowEvalPythonExec.scala  |  61 ++
 .../execution/python/BatchEvalPythonExec.scala  | 193 ++
 .../sql/execution/python/EvalPythonExec.scala   | 142 +
 .../execution/python/ExtractPythonUDFs.scala|  11 +-
 .../spark/sql/execution/python/PythonUDF.scala  |   3 +-
 .../python/UserDefinedPythonFunction.scala  |   5 +-
 .../python/BatchEvalPythonExecSuite.scala   |   7 +-
 13 files changed, 666 insertions(+), 173 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/27fc536d/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 3377101..86d0405 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -83,10 +83,23 @@ private[spark] case class PythonFunction(
  */
 private[spark] case class ChainedPythonFunctions(funcs: Seq[PythonFunction])
 
+/**
+ * Enumerate the type of command that will be sent to the Python worker
+ */
+private[spark] object PythonEvalType {
+  val NON_UDF = 0
+  val SQL_BATCHED_UDF = 1
+  val SQL_PANDAS_UDF = 2
+}
+
 private[spark] object PythonRunner {
   def apply(func: PythonFunction, bufferSize: Int, reuse_worker: Boolean): 
PythonRunner = {
 new PythonRunner(
-  Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuse_worker, false, 
Array(Array(0)))
+  Seq(ChainedPythonFunctions(Seq(func))),
+  bufferSize,
+  reuse_worker,
+  PythonEvalType.NON_UDF,
+  Array(Array(0)))
   }
 }
 
@@ -100,7 +113,7 @@ private[spark] class PythonRunner(
 funcs: Seq[ChainedPythonFunctions],
 bufferSize: Int,
 reuse_worker: Boolean,
-isUDF: Boolean,
+evalType: Int,
 argOffsets: Array[Array[Int]])
   extends Logging {
 
@@ -309,8 +322,8 @@ private[spark] class PythonRunner(
 }
 dataOut.flush()
 // Serialized command:
-if (isUDF) {
-  dataOut.writeInt(1)
+dataOut.writeInt(evalType)
+if (evalType != PythonEvalType.NON_UDF) {
   dataOut.writeInt(funcs.length)
   funcs.zip(argOffsets).foreach { case (chained, offsets) =>
 dataOut.writeInt(offsets.length)
@@ -324,7 +337,6 @@ private[spark] class PythonRunner(
 }
   }
 } else {
-  dataOut.writeInt(0)
   val command = funcs.head.funcs.head.command
   dataOut.writeInt(command.length)
   dataOut.write(command)


spark git commit: [SPARK-22072][SPARK-22071][BUILD] Improve release build scripts

2017-09-22 Thread holden
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 090b987e6 -> de6274a58


[SPARK-22072][SPARK-22071][BUILD] Improve release build scripts

## What changes were proposed in this pull request?

Check JDK version (with javac) and use SPARK_VERSION for publish-release

## How was this patch tested?

Manually tried local build with wrong JDK / JAVA_HOME & built a local release 
(LFTP disabled)

Author: Holden Karau 

Closes #19312 from holdenk/improve-release-scripts-r2.

(cherry picked from commit 8f130ad40178e35fecb3f2ba4a61ad23e6a90e3d)
Signed-off-by: Holden Karau 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de6274a5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de6274a5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de6274a5

Branch: refs/heads/branch-2.2
Commit: de6274a585fdc2eb9252dc5d5688ce3f3e9e0c39
Parents: 090b987
Author: Holden Karau 
Authored: Fri Sep 22 00:14:57 2017 -0700
Committer: Holden Karau 
Committed: Fri Sep 22 00:15:12 2017 -0700

--
 dev/create-release/release-build.sh | 33 ++--
 1 file changed, 31 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/de6274a5/dev/create-release/release-build.sh
--
diff --git a/dev/create-release/release-build.sh 
b/dev/create-release/release-build.sh
index a72307a..f93a96b 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -31,8 +31,8 @@ Top level targets are
 All other inputs are environment variables
 
 GIT_REF - Release tag or commit to build from
-SPARK_VERSION - Release identifier used when publishing
-SPARK_PACKAGE_VERSION - Release identifier in top level package directory
+SPARK_VERSION - Version of Spark being built (e.g. 2.1.2)
+SPARK_PACKAGE_VERSION - Release identifier in top level package directory 
(e.g. 2.1.2-rc1)
 REMOTE_PARENT_DIR - Parent in which to create doc or release builds.
 REMOTE_PARENT_MAX_LENGTH - If set, parent directory will be cleaned to only
  have this number of subdirectories (by deleting old ones). WARNING: This 
deletes data.
@@ -95,6 +95,33 @@ if [ -z "$SPARK_VERSION" ]; then
 | grep -v INFO | grep -v WARNING | grep -v Download)
 fi
 
+# Verify we have the right java version set
+if [ -z "$JAVA_HOME" ]; then
+  echo "Please set JAVA_HOME."
+  exit 1
+fi
+
+java_version=$("${JAVA_HOME}"/bin/javac -version 2>&1 | cut -d " " -f 2)
+
+if [[ ! $SPARK_VERSION < "2.2." ]]; then
+  if [[ $java_version < "1.8." ]]; then
+echo "Java version $java_version is less than required 1.8 for 2.2+"
+echo "Please set JAVA_HOME correctly."
+exit 1
+  fi
+else
+  if [[ $java_version > "1.7." ]]; then
+if [ -z "$JAVA_7_HOME" ]; then
+  echo "Java version $java_version is higher than required 1.7 for pre-2.2"
+  echo "Please set JAVA_HOME correctly."
+  exit 1
+else
+  JAVA_HOME="$JAVA_7_HOME"
+fi
+  fi
+fi
+
+
 if [ -z "$SPARK_PACKAGE_VERSION" ]; then
   SPARK_PACKAGE_VERSION="${SPARK_VERSION}-$(date +%Y_%m_%d_%H_%M)-${git_hash}"
 fi
@@ -318,6 +345,8 @@ if [[ "$1" == "publish-snapshot" ]]; then
 fi
 
 if [[ "$1" == "publish-release" ]]; then
+  SPARK_VERSION=$SPARK_PACKAGE_VERSION
+
   cd spark
   # Publish Spark to Maven release repo
   echo "Publishing Spark checkout at '$GIT_REF' ($git_hash)"


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22072][SPARK-22071][BUILD] Improve release build scripts

2017-09-22 Thread holden
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 1a4b6eea8 -> d930bbb40


[SPARK-22072][SPARK-22071][BUILD] Improve release build scripts

## What changes were proposed in this pull request?

Check JDK version (with javac) and use SPARK_VERSION for publish-release

## How was this patch tested?

Manually tried local build with wrong JDK / JAVA_HOME & built a local release 
(LFTP disabled)

Author: Holden Karau 

Closes #19312 from holdenk/improve-release-scripts-r2.

(cherry picked from commit 8f130ad40178e35fecb3f2ba4a61ad23e6a90e3d)
Signed-off-by: Holden Karau 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d930bbb4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d930bbb4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d930bbb4

Branch: refs/heads/branch-2.1
Commit: d930bbb40648a39c44780ba51513489923babd3e
Parents: 1a4b6ee
Author: Holden Karau 
Authored: Fri Sep 22 00:14:57 2017 -0700
Committer: Holden Karau 
Committed: Fri Sep 22 00:15:27 2017 -0700

--
 dev/create-release/release-build.sh | 33 ++--
 1 file changed, 31 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d930bbb4/dev/create-release/release-build.sh
--
diff --git a/dev/create-release/release-build.sh 
b/dev/create-release/release-build.sh
index c4ddc21..fa889d9 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -31,8 +31,8 @@ Top level targets are
 All other inputs are environment variables
 
 GIT_REF - Release tag or commit to build from
-SPARK_VERSION - Release identifier used when publishing
-SPARK_PACKAGE_VERSION - Release identifier in top level package directory
+SPARK_VERSION - Version of Spark being built (e.g. 2.1.2)
+SPARK_PACKAGE_VERSION - Release identifier in top level package directory 
(e.g. 2.1.2-rc1)
 REMOTE_PARENT_DIR - Parent in which to create doc or release builds.
 REMOTE_PARENT_MAX_LENGTH - If set, parent directory will be cleaned to only
  have this number of subdirectories (by deleting old ones). WARNING: This 
deletes data.
@@ -95,6 +95,33 @@ if [ -z "$SPARK_VERSION" ]; then
 | grep -v INFO | grep -v WARNING | grep -v Download)
 fi
 
+# Verify we have the right java version set
+if [ -z "$JAVA_HOME" ]; then
+  echo "Please set JAVA_HOME."
+  exit 1
+fi
+
+java_version=$("${JAVA_HOME}"/bin/javac -version 2>&1 | cut -d " " -f 2)
+
+if [[ ! $SPARK_VERSION < "2.2." ]]; then
+  if [[ $java_version < "1.8." ]]; then
+echo "Java version $java_version is less than required 1.8 for 2.2+"
+echo "Please set JAVA_HOME correctly."
+exit 1
+  fi
+else
+  if [[ $java_version > "1.7." ]]; then
+if [ -z "$JAVA_7_HOME" ]; then
+  echo "Java version $java_version is higher than required 1.7 for pre-2.2"
+  echo "Please set JAVA_HOME correctly."
+  exit 1
+else
+  JAVA_HOME="$JAVA_7_HOME"
+fi
+  fi
+fi
+
+
 if [ -z "$SPARK_PACKAGE_VERSION" ]; then
   SPARK_PACKAGE_VERSION="${SPARK_VERSION}-$(date +%Y_%m_%d_%H_%M)-${git_hash}"
 fi
@@ -322,6 +349,8 @@ if [[ "$1" == "publish-snapshot" ]]; then
 fi
 
 if [[ "$1" == "publish-release" ]]; then
+  SPARK_VERSION=$SPARK_PACKAGE_VERSION
+
   cd spark
   # Publish Spark to Maven release repo
   echo "Publishing Spark checkout at '$GIT_REF' ($git_hash)"


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22072][SPARK-22071][BUILD] Improve release build scripts

2017-09-22 Thread holden
Repository: spark
Updated Branches:
  refs/heads/master 5960686e7 -> 8f130ad40


[SPARK-22072][SPARK-22071][BUILD] Improve release build scripts

## What changes were proposed in this pull request?

Check JDK version (with javac) and use SPARK_VERSION for publish-release

## How was this patch tested?

Manually tried local build with wrong JDK / JAVA_HOME & built a local release 
(LFTP disabled)

Author: Holden Karau 

Closes #19312 from holdenk/improve-release-scripts-r2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f130ad4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f130ad4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f130ad4

Branch: refs/heads/master
Commit: 8f130ad40178e35fecb3f2ba4a61ad23e6a90e3d
Parents: 5960686
Author: Holden Karau 
Authored: Fri Sep 22 00:14:57 2017 -0700
Committer: Holden Karau 
Committed: Fri Sep 22 00:14:57 2017 -0700

--
 dev/create-release/release-build.sh | 33 ++--
 1 file changed, 31 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8f130ad4/dev/create-release/release-build.sh
--
diff --git a/dev/create-release/release-build.sh 
b/dev/create-release/release-build.sh
index f4a7f25..8de1d6a 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -31,8 +31,8 @@ Top level targets are
 All other inputs are environment variables
 
 GIT_REF - Release tag or commit to build from
-SPARK_VERSION - Release identifier used when publishing
-SPARK_PACKAGE_VERSION - Release identifier in top level package directory
+SPARK_VERSION - Version of Spark being built (e.g. 2.1.2)
+SPARK_PACKAGE_VERSION - Release identifier in top level package directory 
(e.g. 2.1.2-rc1)
 REMOTE_PARENT_DIR - Parent in which to create doc or release builds.
 REMOTE_PARENT_MAX_LENGTH - If set, parent directory will be cleaned to only
  have this number of subdirectories (by deleting old ones). WARNING: This 
deletes data.
@@ -104,6 +104,33 @@ if [ -z "$SPARK_VERSION" ]; then
 | grep -v INFO | grep -v WARNING | grep -v Download)
 fi
 
+# Verify we have the right java version set
+if [ -z "$JAVA_HOME" ]; then
+  echo "Please set JAVA_HOME."
+  exit 1
+fi
+
+java_version=$("${JAVA_HOME}"/bin/javac -version 2>&1 | cut -d " " -f 2)
+
+if [[ ! $SPARK_VERSION < "2.2." ]]; then
+  if [[ $java_version < "1.8." ]]; then
+echo "Java version $java_version is less than required 1.8 for 2.2+"
+echo "Please set JAVA_HOME correctly."
+exit 1
+  fi
+else
+  if [[ $java_version > "1.7." ]]; then
+if [ -z "$JAVA_7_HOME" ]; then
+  echo "Java version $java_version is higher than required 1.7 for pre-2.2"
+  echo "Please set JAVA_HOME correctly."
+  exit 1
+else
+  JAVA_HOME="$JAVA_7_HOME"
+fi
+  fi
+fi
+
+
 if [ -z "$SPARK_PACKAGE_VERSION" ]; then
   SPARK_PACKAGE_VERSION="${SPARK_VERSION}-$(date +%Y_%m_%d_%H_%M)-${git_hash}"
 fi
@@ -326,6 +353,8 @@ if [[ "$1" == "publish-snapshot" ]]; then
 fi
 
 if [[ "$1" == "publish-release" ]]; then
+  SPARK_VERSION=$SPARK_PACKAGE_VERSION
+
   cd spark
   # Publish Spark to Maven release repo
   echo "Publishing Spark checkout at '$GIT_REF' ($git_hash)"


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21998][SQL] SortMergeJoinExec did not calculate its outputOrdering correctly during physical planning

2017-09-22 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 5ac96854c -> 5960686e7


[SPARK-21998][SQL] SortMergeJoinExec did not calculate its outputOrdering 
correctly during physical planning

## What changes were proposed in this pull request?

Right now the calculation of SortMergeJoinExec's outputOrdering relies on the 
fact that its children have already been sorted on the join keys, while this is 
often not true until EnsureRequirements has been applied. So we ended up not 
getting the correct outputOrdering during physical planning stage before Sort 
nodes are added to the children.

For example, J = {A join B on key1 = key2}
1. if A is NOT ordered on key1 ASC, J's outputOrdering should include "key1 ASC"
2. if A is ordered on key1 ASC, J's outputOrdering should include "key1 ASC"
3. if A is ordered on key1 ASC, with sameOrderExp=c1, J's outputOrdering should 
include "key1 ASC, sameOrderExp=c1"

So to fix this I changed the  behavior of getKeyOrdering(keys, 
childOutputOrdering) to:
1. If the childOutputOrdering satisfies (is a superset of) the required child 
ordering => childOutputOrdering
2. Otherwise => required child ordering

In addition, I organized the logic for deciding the relationship between two 
orderings into SparkPlan, so that it can be reused by EnsureRequirements and 
SortMergeJoinExec, and potentially other classes.

## How was this patch tested?

Added new test cases.
Passed all integration tests.

Author: maryannxue 

Closes #19281 from maryannxue/spark-21998.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5960686e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5960686e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5960686e

Branch: refs/heads/master
Commit: 5960686e791b5d6642a30c43c1de61e96e594a5e
Parents: 5ac9685
Author: maryannxue 
Authored: Thu Sep 21 23:54:16 2017 -0700
Committer: gatorsmile 
Committed: Thu Sep 21 23:54:16 2017 -0700

--
 .../sql/catalyst/expressions/SortOrder.scala| 23 
 .../execution/exchange/EnsureRequirements.scala | 21 ++-
 .../sql/execution/joins/SortMergeJoinExec.scala | 17 --
 .../scala/org/apache/spark/sql/JoinSuite.scala  | 62 
 4 files changed, 102 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5960686e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
index abcb9a2..ff7c98f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
@@ -96,6 +96,29 @@ object SortOrder {
  sameOrderExpressions: Set[Expression] = Set.empty): SortOrder = {
 new SortOrder(child, direction, direction.defaultNullOrdering, 
sameOrderExpressions)
   }
+
+  /**
+   * Returns if a sequence of SortOrder satisfies another sequence of 
SortOrder.
+   *
+   * SortOrder sequence A satisfies SortOrder sequence B if and only if B is 
an equivalent of A
+   * or of A's prefix. Here are examples of ordering A satisfying ordering B:
+   * 
+   *   ordering A is [x, y] and ordering B is [x]
+   *   ordering A is [x(sameOrderExpressions=x1)] and ordering B is 
[x1]
+   *   ordering A is [x(sameOrderExpressions=x1), y] and ordering B is 
[x1]
+   * 
+   */
+  def orderingSatisfies(ordering1: Seq[SortOrder], ordering2: Seq[SortOrder]): 
Boolean = {
+if (ordering2.isEmpty) {
+  true
+} else if (ordering2.length > ordering1.length) {
+  false
+} else {
+  ordering2.zip(ordering1).forall {
+case (o2, o1) => o1.satisfies(o2)
+  }
+}
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/5960686e/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index b91d077..1da72f2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -234,24 +234,11 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
 
 // Now that we've performed any necessary shuffles, add sorts to guarantee 
output