spark git commit: [SPARK-21475][Core]Revert "[SPARK-21475][CORE] Use NIO's Files API to replace FileInputStream/FileOutputStream in some critical paths"

2017-12-29 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master f2b3525c1 -> 14c4a62c1


[SPARK-21475][Core]Revert "[SPARK-21475][CORE] Use NIO's Files API to replace 
FileInputStream/FileOutputStream in some critical paths"

## What changes were proposed in this pull request?

This reverts commit 5fd0294ff8960982cfb3b901f84bc91a9f51bf28 because of a huge 
performance regression.
I manually fixed a minor conflict in `OneForOneBlockFetcher.java`.

`Files.newInputStream` returns `sun.nio.ch.ChannelInputStream`. 
`ChannelInputStream` doesn't override `InputStream.skip`, so it's using the 
default `InputStream.skip` which just consumes and discards data. This causes a 
huge performance regression when reading shuffle files.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #20119 from zsxwing/revert-SPARK-21475.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14c4a62c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14c4a62c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14c4a62c

Branch: refs/heads/master
Commit: 14c4a62c126ac5e542f7b82565b5f0fcf3e7fa5a
Parents: f2b3525
Author: Shixiong Zhu 
Authored: Fri Dec 29 22:33:29 2017 -0800
Committer: Shixiong Zhu 
Committed: Fri Dec 29 22:33:29 2017 -0800

--
 .../buffer/FileSegmentManagedBuffer.java|  9 -
 .../network/shuffle/OneForOneBlockFetcher.java  |  4 ++--
 .../shuffle/ShuffleIndexInformation.java|  4 ++--
 .../sort/BypassMergeSortShuffleWriter.java  | 16 +++
 .../spark/shuffle/sort/UnsafeShuffleWriter.java |  9 -
 .../shuffle/IndexShuffleBlockResolver.scala |  6 ++
 .../util/collection/ExternalAppendOnlyMap.scala | 21 +---
 .../spark/util/collection/ExternalSorter.scala  | 17 +++-
 8 files changed, 37 insertions(+), 49 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/14c4a62c/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
--
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 
b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
index ea9b3ce..c20fab8 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
@@ -18,13 +18,12 @@
 package org.apache.spark.network.buffer;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.nio.file.Files;
-import java.nio.file.StandardOpenOption;
 
 import com.google.common.base.Objects;
 import com.google.common.io.ByteStreams;
@@ -94,9 +93,9 @@ public final class FileSegmentManagedBuffer extends 
ManagedBuffer {
 
   @Override
   public InputStream createInputStream() throws IOException {
-InputStream is = null;
+FileInputStream is = null;
 try {
-  is = Files.newInputStream(file.toPath());
+  is = new FileInputStream(file);
   ByteStreams.skipFully(is, offset);
   return new LimitedInputStream(is, length);
 } catch (IOException e) {
@@ -133,7 +132,7 @@ public final class FileSegmentManagedBuffer extends 
ManagedBuffer {
 if (conf.lazyFileDescriptor()) {
   return new DefaultFileRegion(file, offset, length);
 } else {
-  FileChannel fileChannel = FileChannel.open(file.toPath(), 
StandardOpenOption.READ);
+  FileChannel fileChannel = new FileInputStream(file).getChannel();
   return new DefaultFileRegion(fileChannel, offset, length);
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/14c4a62c/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
--
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
index 3f2f20b..9cac7d0 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
@@ -18,11 +18,11 @@
 package org.apache.spark.network.shuffle;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
-import java.nio.

spark git commit: [SPARK-22771][SQL] Concatenate binary inputs into a binary output

2017-12-29 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 2ea17afb6 -> f2b3525c1


[SPARK-22771][SQL] Concatenate binary inputs into a binary output

## What changes were proposed in this pull request?
This pr modified `concat` to concat binary inputs into a single binary output.
`concat` in the current master always output data as a string. But, in some 
databases (e.g., PostgreSQL), if all inputs are binary, `concat` also outputs 
binary.

## How was this patch tested?
Added tests in `SQLQueryTestSuite` and `TypeCoercionSuite`.

Author: Takeshi Yamamuro 

Closes #19977 from maropu/SPARK-22771.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f2b3525c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f2b3525c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f2b3525c

Branch: refs/heads/master
Commit: f2b3525c17d660cf6f082bbafea8632615b4f58e
Parents: 2ea17af
Author: Takeshi Yamamuro 
Authored: Sat Dec 30 14:09:56 2017 +0800
Committer: gatorsmile 
Committed: Sat Dec 30 14:09:56 2017 +0800

--
 R/pkg/R/functions.R |   3 +-
 .../apache/spark/unsafe/types/ByteArray.java|  25 ++
 docs/sql-programming-guide.md   |   2 +
 python/pyspark/sql/functions.py |   3 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala  |   2 +-
 .../sql/catalyst/analysis/TypeCoercion.scala|  26 +-
 .../expressions/stringExpressions.scala |  52 +++-
 .../sql/catalyst/optimizer/expressions.scala|  15 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |   8 +
 .../catalyst/analysis/TypeCoercionSuite.scala   |  54 +
 .../optimizer/CombineConcatsSuite.scala |  14 +-
 .../scala/org/apache/spark/sql/functions.scala  |   3 +-
 .../sql-tests/inputs/string-functions.sql   |  23 ++
 .../inputs/typeCoercion/native/concat.sql   |  93 
 .../sql-tests/results/string-functions.sql.out  |  45 +++-
 .../results/typeCoercion/native/concat.sql.out  | 239 +++
 16 files changed, 587 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f2b3525c/R/pkg/R/functions.R
--
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index fff230d..55365a4 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -2133,7 +2133,8 @@ setMethod("countDistinct",
   })
 
 #' @details
-#' \code{concat}: Concatenates multiple input string columns together into a 
single string column.
+#' \code{concat}: Concatenates multiple input columns together into a single 
column.
+#' If all inputs are binary, concat returns an output as binary. Otherwise, it 
returns as string.
 #'
 #' @rdname column_string_functions
 #' @aliases concat concat,Column-method

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b3525c/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java
--
diff --git 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java
index 7ced13d..c03caf0 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java
@@ -74,4 +74,29 @@ public final class ByteArray {
 }
 return Arrays.copyOfRange(bytes, start, end);
   }
+
+  public static byte[] concat(byte[]... inputs) {
+// Compute the total length of the result
+int totalLength = 0;
+for (int i = 0; i < inputs.length; i++) {
+  if (inputs[i] != null) {
+totalLength += inputs[i].length;
+  } else {
+return null;
+  }
+}
+
+// Allocate a new byte array, and copy the inputs one by one into it
+final byte[] result = new byte[totalLength];
+int offset = 0;
+for (int i = 0; i < inputs.length; i++) {
+  int len = inputs[i].length;
+  Platform.copyMemory(
+inputs[i], Platform.BYTE_ARRAY_OFFSET,
+result, Platform.BYTE_ARRAY_OFFSET + offset,
+len);
+  offset += len;
+}
+return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b3525c/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index f02f462..4b5f56c 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1780,6 +1780,8 @@ options.
  
  - Since Spark 2.3, when either broadcast hash join or broadcast nested loop 
join is applicable, we prefer to broadcasting the table that is explicitly 
specified in a broadcast hint. For details, see the section [Broadcast 
Hint](#broadcast-hint-for

svn commit: r23955 - in /dev/spark/2.3.0-SNAPSHOT-2017_12_29_20_01-8169630-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2017-12-29 Thread pwendell
Author: pwendell
Date: Sat Dec 30 04:14:51 2017
New Revision: 23955

Log:
Apache Spark 2.3.0-SNAPSHOT-2017_12_29_20_01-8169630 docs


[This commit notification would consist of 1425 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22881][ML][TEST] ML regression package testsuite add StructuredStreaming test

2017-12-29 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 816963043 -> 2ea17afb6


[SPARK-22881][ML][TEST] ML regression package testsuite add StructuredStreaming 
test

## What changes were proposed in this pull request?

ML regression package testsuite add StructuredStreaming test

In order to make testsuite easier to modify, new helper function added in 
`MLTest`:
```
def testTransformerByGlobalCheckFunc[A : Encoder](
  dataframe: DataFrame,
  transformer: Transformer,
  firstResultCol: String,
  otherResultCols: String*)
  (globalCheckFunction: Seq[Row] => Unit): Unit
```

## How was this patch tested?

N/A

Author: WeichenXu 
Author: Bago Amirbekian 

Closes #19979 from WeichenXu123/ml_stream_test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ea17afb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ea17afb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ea17afb

Branch: refs/heads/master
Commit: 2ea17afb63f976500273518bf1b32f9efe250812
Parents: 8169630
Author: WeichenXu 
Authored: Fri Dec 29 20:06:56 2017 -0800
Committer: Joseph K. Bradley 
Committed: Fri Dec 29 20:06:56 2017 -0800

--
 .../regression/AFTSurvivalRegressionSuite.scala | 19 
 .../regression/DecisionTreeRegressorSuite.scala | 43 +-
 .../spark/ml/regression/GBTRegressorSuite.scala | 23 +-
 .../GeneralizedLinearRegressionSuite.scala  | 47 ++--
 .../ml/regression/IsotonicRegressionSuite.scala | 43 +++---
 .../ml/regression/LinearRegressionSuite.scala   | 25 ++-
 .../scala/org/apache/spark/ml/util/MLTest.scala | 39 
 .../org/apache/spark/ml/util/MLTestSuite.scala  | 12 -
 .../apache/spark/sql/streaming/StreamTest.scala | 27 ++-
 9 files changed, 147 insertions(+), 131 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2ea17afb/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala
index 02e5c6d..4e4ff71 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala
@@ -19,19 +19,16 @@ package org.apache.spark.ml.regression
 
 import scala.util.Random
 
-import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.linalg.{Vector, Vectors}
 import org.apache.spark.ml.param.ParamsSuite
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
 import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.random.{ExponentialGenerator, WeibullGenerator}
-import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.functions.{col, lit}
 import org.apache.spark.sql.types._
 
-class AFTSurvivalRegressionSuite
-  extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
+class AFTSurvivalRegressionSuite extends MLTest with DefaultReadWriteTest {
 
   import testImplicits._
 
@@ -191,8 +188,8 @@ class AFTSurvivalRegressionSuite
 assert(model.predict(features) ~== responsePredictR relTol 1E-3)
 assert(model.predictQuantiles(features) ~== quantilePredictR relTol 1E-3)
 
-model.transform(datasetUnivariate).select("features", "prediction", 
"quantiles")
-  .collect().foreach {
+testTransformer[(Vector, Double, Double)](datasetUnivariate, model,
+  "features", "prediction", "quantiles") {
 case Row(features: Vector, prediction: Double, quantiles: Vector) =>
   assert(prediction ~== model.predict(features) relTol 1E-5)
   assert(quantiles ~== model.predictQuantiles(features) relTol 1E-5)
@@ -261,8 +258,8 @@ class AFTSurvivalRegressionSuite
 assert(model.predict(features) ~== responsePredictR relTol 1E-3)
 assert(model.predictQuantiles(features) ~== quantilePredictR relTol 1E-3)
 
-model.transform(datasetMultivariate).select("features", "prediction", 
"quantiles")
-  .collect().foreach {
+testTransformer[(Vector, Double, Double)](datasetMultivariate, model,
+  "features", "prediction", "quantiles") {
 case Row(features: Vector, prediction: Double, quantiles: Vector) =>
   assert(prediction ~== model.predict(features) relTol 1E-5)
   assert(quantiles ~== model.predictQuantiles(features) relTol 1E-5)
@@ -331,8 +328,8 @@ class AFTSurvivalRegressionSuite
 assert(model.predict(features) ~== respon

spark git commit: [SPARK-22734][ML][PYSPARK] Added Python API for VectorSizeHint.

2017-12-29 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 30fcdc038 -> 816963043


[SPARK-22734][ML][PYSPARK] Added Python API for VectorSizeHint.

(Please fill in changes proposed in this fix)

Python API for VectorSizeHint Transformer.

(Please explain how this patch was tested. E.g. unit tests, integration tests, 
manual tests)

doc-tests.

Author: Bago Amirbekian 

Closes #20112 from MrBago/vectorSizeHint-PythonAPI.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81696304
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81696304
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81696304

Branch: refs/heads/master
Commit: 816963043a09b082e8b9280dfa97fdaa19211015
Parents: 30fcdc0
Author: Bago Amirbekian 
Authored: Fri Dec 29 19:45:14 2017 -0800
Committer: Joseph K. Bradley 
Committed: Fri Dec 29 19:45:14 2017 -0800

--
 .../spark/ml/feature/VectorSizeHint.scala   |  1 +
 python/pyspark/ml/feature.py| 79 
 2 files changed, 80 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/81696304/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala
index 1fe3cfc..f5947d6 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.types.StructType
  * VectorAssembler needs size information for its input columns and cannot be 
used on streaming
  * dataframes without this metadata.
  *
+ * Note: VectorSizeHint modifies `inputCol` to include size metadata and does 
not have an outputCol.
  */
 @Experimental
 @Since("2.3.0")

http://git-wip-us.apache.org/repos/asf/spark/blob/81696304/python/pyspark/ml/feature.py
--
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 608f2a5..5094324 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -57,6 +57,7 @@ __all__ = ['Binarizer',
'Tokenizer',
'VectorAssembler',
'VectorIndexer', 'VectorIndexerModel',
+   'VectorSizeHint',
'VectorSlicer',
'Word2Vec', 'Word2VecModel']
 
@@ -3466,6 +3467,84 @@ class ChiSqSelectorModel(JavaModel, JavaMLReadable, 
JavaMLWritable):
 return self._call_java("selectedFeatures")
 
 
+@inherit_doc
+class VectorSizeHint(JavaTransformer, HasInputCol, HasHandleInvalid, 
JavaMLReadable,
+ JavaMLWritable):
+"""
+.. note:: Experimental
+
+A feature transformer that adds size information to the metadata of a 
vector column.
+VectorAssembler needs size information for its input columns and cannot be 
used on streaming
+dataframes without this metadata.
+
+.. note:: VectorSizeHint modifies `inputCol` to include size metadata and 
does not have an
+outputCol.
+
+>>> from pyspark.ml.linalg import Vectors
+>>> from pyspark.ml import Pipeline, PipelineModel
+>>> data = [(Vectors.dense([1., 2., 3.]), 4.)]
+>>> df = spark.createDataFrame(data, ["vector", "float"])
+>>>
+>>> sizeHint = VectorSizeHint(inputCol="vector", size=3, 
handleInvalid="skip")
+>>> vecAssembler = VectorAssembler(inputCols=["vector", "float"], 
outputCol="assembled")
+>>> pipeline = Pipeline(stages=[sizeHint, vecAssembler])
+>>>
+>>> pipelineModel = pipeline.fit(df)
+>>> pipelineModel.transform(df).head().assembled
+DenseVector([1.0, 2.0, 3.0, 4.0])
+>>> vectorSizeHintPath = temp_path + "/vector-size-hint-pipeline"
+>>> pipelineModel.save(vectorSizeHintPath)
+>>> loadedPipeline = PipelineModel.load(vectorSizeHintPath)
+>>> loaded = loadedPipeline.transform(df).head().assembled
+>>> expected = pipelineModel.transform(df).head().assembled
+>>> loaded == expected
+True
+
+.. versionadded:: 2.3.0
+"""
+
+size = Param(Params._dummy(), "size", "Size of vectors in column.",
+ typeConverter=TypeConverters.toInt)
+
+handleInvalid = Param(Params._dummy(), "handleInvalid",
+  "How to handle invalid vectors in inputCol. Invalid 
vectors include "
+  "nulls and vectors with the wrong size. The options 
are `skip` (filter "
+  "out rows with invalid vectors), `error` (throw an 
error) and "
+  "`optimistic` (do not check the vector size, and 
keep all rows). "
+  "`error` by default.",

spark git commit: [SPARK-22922][ML][PYSPARK] Pyspark portion of the fit-multiple API

2017-12-29 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master ccda75b0d -> 30fcdc038


[SPARK-22922][ML][PYSPARK] Pyspark portion of the fit-multiple API

## What changes were proposed in this pull request?

Adding fitMultiple API to `Estimator` with default implementation. Also update 
have ml.tuning meta-estimators use this API.

## How was this patch tested?

Unit tests.

Author: Bago Amirbekian 

Closes #20058 from MrBago/python-fitMultiple.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30fcdc03
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30fcdc03
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30fcdc03

Branch: refs/heads/master
Commit: 30fcdc0380de4f107977d39d067b07e149ab2cb1
Parents: ccda75b
Author: Bago Amirbekian 
Authored: Fri Dec 29 16:31:25 2017 -0800
Committer: Joseph K. Bradley 
Committed: Fri Dec 29 16:31:25 2017 -0800

--
 python/pyspark/ml/base.py   | 69 ++--
 python/pyspark/ml/tests.py  | 15 +
 python/pyspark/ml/tuning.py | 44 -
 3 files changed, 110 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/30fcdc03/python/pyspark/ml/base.py
--
diff --git a/python/pyspark/ml/base.py b/python/pyspark/ml/base.py
index a6767ce..d4470b5 100644
--- a/python/pyspark/ml/base.py
+++ b/python/pyspark/ml/base.py
@@ -18,13 +18,52 @@
 from abc import ABCMeta, abstractmethod
 
 import copy
+import threading
 
 from pyspark import since
-from pyspark.ml.param import Params
 from pyspark.ml.param.shared import *
 from pyspark.ml.common import inherit_doc
 from pyspark.sql.functions import udf
-from pyspark.sql.types import StructField, StructType, DoubleType
+from pyspark.sql.types import StructField, StructType
+
+
+class _FitMultipleIterator(object):
+"""
+Used by default implementation of Estimator.fitMultiple to produce models 
in a thread safe
+iterator. This class handles the simple case of fitMultiple where each 
param map should be
+fit independently.
+
+:param fitSingleModel: Function: (int => Model) which fits an estimator to 
a dataset.
+`fitSingleModel` may be called up to `numModels` times, with a unique 
index each time.
+Each call to `fitSingleModel` with an index should return the Model 
associated with
+that index.
+:param numModel: Number of models this iterator should produce.
+
+See Estimator.fitMultiple for more info.
+"""
+def __init__(self, fitSingleModel, numModels):
+"""
+
+"""
+self.fitSingleModel = fitSingleModel
+self.numModel = numModels
+self.counter = 0
+self.lock = threading.Lock()
+
+def __iter__(self):
+return self
+
+def __next__(self):
+with self.lock:
+index = self.counter
+if index >= self.numModel:
+raise StopIteration("No models remaining.")
+self.counter += 1
+return index, self.fitSingleModel(index)
+
+def next(self):
+"""For python2 compatibility."""
+return self.__next__()
 
 
 @inherit_doc
@@ -47,6 +86,27 @@ class Estimator(Params):
 """
 raise NotImplementedError()
 
+@since("2.3.0")
+def fitMultiple(self, dataset, paramMaps):
+"""
+Fits a model to the input dataset for each param map in `paramMaps`.
+
+:param dataset: input dataset, which is an instance of 
:py:class:`pyspark.sql.DataFrame`.
+:param paramMaps: A Sequence of param maps.
+:return: A thread safe iterable which contains one model for each 
param map. Each
+ call to `next(modelIterator)` will return `(index, model)` 
where model was fit
+ using `paramMaps[index]`. `index` values may not be 
sequential.
+
+.. note:: DeveloperApi
+.. note:: Experimental
+"""
+estimator = self.copy()
+
+def fitSingleModel(index):
+return estimator.fit(dataset, paramMaps[index])
+
+return _FitMultipleIterator(fitSingleModel, len(paramMaps))
+
 @since("1.3.0")
 def fit(self, dataset, params=None):
 """
@@ -61,7 +121,10 @@ class Estimator(Params):
 if params is None:
 params = dict()
 if isinstance(params, (list, tuple)):
-return [self.fit(dataset, paramMap) for paramMap in params]
+models = [None] * len(params)
+for index, model in self.fitMultiple(dataset, params):
+models[index] = model
+return models
 elif isinstance(params, dict):
 if params:
 return self.copy(params)._fit(dataset)

http://git-wip-us.apache.org/repos/asf/spark/bl

svn commit: r23954 - in /dev/spark/2.3.0-SNAPSHOT-2017_12_29_16_01-ccda75b-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2017-12-29 Thread pwendell
Author: pwendell
Date: Sat Dec 30 00:14:47 2017
New Revision: 23954

Log:
Apache Spark 2.3.0-SNAPSHOT-2017_12_29_16_01-ccda75b docs


[This commit notification would consist of 1425 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22921][PROJECT-INFRA] Bug fix in jira assigning

2017-12-29 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/master 66a7d6b30 -> ccda75b0d


[SPARK-22921][PROJECT-INFRA] Bug fix in jira assigning

Small bug fix from last pr, ran a successful merge with this code.

Author: Imran Rashid 

Closes #20117 from squito/SPARK-22921.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ccda75b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ccda75b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ccda75b0

Branch: refs/heads/master
Commit: ccda75b0d17637abaa7ab54b68e4db8e1a16f2ce
Parents: 66a7d6b
Author: Imran Rashid 
Authored: Fri Dec 29 17:07:01 2017 -0600
Committer: Imran Rashid 
Committed: Fri Dec 29 17:07:01 2017 -0600

--
 dev/merge_spark_pr.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ccda75b0/dev/merge_spark_pr.py
--
diff --git a/dev/merge_spark_pr.py b/dev/merge_spark_pr.py
index cd5dd1e..57ca840 100755
--- a/dev/merge_spark_pr.py
+++ b/dev/merge_spark_pr.py
@@ -242,8 +242,8 @@ def resolve_jira_issue(merge_branches, comment, 
default_jira_id=""):
 cur_summary = issue.fields.summary
 cur_assignee = issue.fields.assignee
 if cur_assignee is None:
-cur_assignee = choose_jira_assignee(issue)
-# Check again, we might not have chose an assignee
+cur_assignee = choose_jira_assignee(issue, asf_jira)
+# Check again, we might not have chosen an assignee
 if cur_assignee is None:
 cur_assignee = "NOT ASSIGNED!!!"
 else:


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r23948 - in /dev/spark/2.3.0-SNAPSHOT-2017_12_29_12_01-66a7d6b-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2017-12-29 Thread pwendell
Author: pwendell
Date: Fri Dec 29 20:14:47 2017
New Revision: 23948

Log:
Apache Spark 2.3.0-SNAPSHOT-2017_12_29_12_01-66a7d6b docs


[This commit notification would consist of 1425 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22920][SPARKR] sql functions for current_date, current_timestamp, rtrim/ltrim/trim with trimString

2017-12-29 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/master afc364146 -> 66a7d6b30


[SPARK-22920][SPARKR] sql functions for current_date, current_timestamp, 
rtrim/ltrim/trim with trimString

## What changes were proposed in this pull request?

Add sql functions

## How was this patch tested?

manual, unit tests

Author: Felix Cheung 

Closes #20105 from felixcheung/rsqlfuncs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/66a7d6b3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/66a7d6b3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/66a7d6b3

Branch: refs/heads/master
Commit: 66a7d6b30fe5581d09ef660abe2a9c8c334d29f2
Parents: afc3641
Author: Felix Cheung 
Authored: Fri Dec 29 10:51:43 2017 -0800
Committer: Felix Cheung 
Committed: Fri Dec 29 10:51:43 2017 -0800

--
 R/pkg/DESCRIPTION |   1 +
 R/pkg/NAMESPACE   |   2 +
 R/pkg/R/functions.R   | 105 -
 R/pkg/R/generics.R|  17 -
 R/pkg/tests/fulltests/test_sparkSQL.R |   4 +-
 5 files changed, 106 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/66a7d6b3/R/pkg/DESCRIPTION
--
diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index d1c846c..6d46c31 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -59,3 +59,4 @@ Collate:
 'window.R'
 RoxygenNote: 5.0.1
 VignetteBuilder: knitr
+NeedsCompilation: no

http://git-wip-us.apache.org/repos/asf/spark/blob/66a7d6b3/R/pkg/NAMESPACE
--
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index ce3eec0..3219c6f 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -228,6 +228,8 @@ exportMethods("%<=>%",
   "crc32",
   "create_array",
   "create_map",
+  "current_date",
+  "current_timestamp",
   "hash",
   "cume_dist",
   "date_add",

http://git-wip-us.apache.org/repos/asf/spark/blob/66a7d6b3/R/pkg/R/functions.R
--
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index 3a96f94..fff230d 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -39,7 +39,8 @@ NULL
 #' Date time functions defined for \code{Column}.
 #'
 #' @param x Column to compute on. In \code{window}, it must be a time Column of
-#'  \code{TimestampType}.
+#'  \code{TimestampType}. This is not used with \code{current_date} and
+#'  \code{current_timestamp}
 #' @param format The format for the given dates or timestamps in Column 
\code{x}. See the
 #'   format used in the following methods:
 #'   \itemize{
@@ -1109,10 +1110,11 @@ setMethod("lower",
   })
 
 #' @details
-#' \code{ltrim}: Trims the spaces from left end for the specified string value.
+#' \code{ltrim}: Trims the spaces from left end for the specified string 
value. Optionally a
+#' \code{trimString} can be specified.
 #'
 #' @rdname column_string_functions
-#' @aliases ltrim ltrim,Column-method
+#' @aliases ltrim ltrim,Column,missing-method
 #' @export
 #' @examples
 #'
@@ -1128,12 +1130,24 @@ setMethod("lower",
 #' head(tmp)}
 #' @note ltrim since 1.5.0
 setMethod("ltrim",
-  signature(x = "Column"),
-  function(x) {
+  signature(x = "Column", trimString = "missing"),
+  function(x, trimString) {
 jc <- callJStatic("org.apache.spark.sql.functions", "ltrim", x@jc)
 column(jc)
   })
 
+#' @param trimString a character string to trim with
+#' @rdname column_string_functions
+#' @aliases ltrim,Column,character-method
+#' @export
+#' @note ltrim(Column, character) since 2.3.0
+setMethod("ltrim",
+  signature(x = "Column", trimString = "character"),
+  function(x, trimString) {
+jc <- callJStatic("org.apache.spark.sql.functions", "ltrim", x@jc, 
trimString)
+column(jc)
+  })
+
 #' @details
 #' \code{max}: Returns the maximum value of the expression in a group.
 #'
@@ -1348,19 +1362,31 @@ setMethod("bround",
   })
 
 #' @details
-#' \code{rtrim}: Trims the spaces from right end for the specified string 
value.
+#' \code{rtrim}: Trims the spaces from right end for the specified string 
value. Optionally a
+#' \code{trimString} can be specified.
 #'
 #' @rdname column_string_functions
-#' @aliases rtrim rtrim,Column-method
+#' @aliases rtrim rtrim,Column,missing-method
 #' @export
 #' @note rtrim since 1.5.0
 setMethod("rtrim",
-  signature(x = "Column"),
-  function(x) {
+  signature(x = "Column", trimString = "missing"),
+ 

spark git commit: [SPARK-22905][ML][FOLLOWUP] Fix GaussianMixtureModel save

2017-12-29 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 4e9e6aee4 -> afc364146


[SPARK-22905][ML][FOLLOWUP] Fix GaussianMixtureModel save

## What changes were proposed in this pull request?
make sure model data is stored in order.  WeichenXu123

## How was this patch tested?
existing tests

Author: Zheng RuiFeng 

Closes #20113 from zhengruifeng/gmm_save.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/afc36414
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/afc36414
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/afc36414

Branch: refs/heads/master
Commit: afc36414601d3f1a1946ccf2c630f43b7b7246a8
Parents: 4e9e6ae
Author: Zheng RuiFeng 
Authored: Fri Dec 29 10:08:03 2017 -0800
Committer: Joseph K. Bradley 
Committed: Fri Dec 29 10:08:03 2017 -0800

--
 .../org/apache/spark/mllib/clustering/GaussianMixtureModel.scala   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/afc36414/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
index afbe4f9..1933d54 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
@@ -154,7 +154,7 @@ object GaussianMixtureModel extends 
Loader[GaussianMixtureModel] {
   val dataArray = Array.tabulate(weights.length) { i =>
 Data(weights(i), gaussians(i).mu, gaussians(i).sigma)
   }
-  
spark.createDataFrame(dataArray).repartition(1).write.parquet(Loader.dataPath(path))
+  spark.createDataFrame(sc.makeRDD(dataArray, 
1)).write.parquet(Loader.dataPath(path))
 }
 
 def load(sc: SparkContext, path: String): GaussianMixtureModel = {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22864][CORE] Disable allocation schedule in ExecutorAllocationManagerSuite.

2017-12-29 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/master 8b497046c -> 4e9e6aee4


[SPARK-22864][CORE] Disable allocation schedule in 
ExecutorAllocationManagerSuite.

The scheduled task was racing with the test code and could influence
the values returned to the test, triggering assertions. The change adds
a new config that is only used during testing, and overrides it
on the affected test suite.

The issue in the bug can be reliably reproduced by reducing the interval
in the test (e.g. to 10ms).

While there, fixed an exception that shows up in the logs while these
tests run, and simplified some code (which was also causing misleading
log messages in the log output of the test).

Author: Marcelo Vanzin 

Closes #20050 from vanzin/SPARK-22864.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e9e6aee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e9e6aee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e9e6aee

Branch: refs/heads/master
Commit: 4e9e6aee44bb2ddb41b567d659358b22fd824222
Parents: 8b49704
Author: Marcelo Vanzin 
Authored: Fri Dec 29 10:51:37 2017 -0600
Committer: Imran Rashid 
Committed: Fri Dec 29 10:51:37 2017 -0600

--
 .../spark/ExecutorAllocationManager.scala   |  7 +-
 .../scala/org/apache/spark/SparkContext.scala   | 20 
 .../spark/scheduler/AsyncEventQueue.scala   |  2 +-
 .../spark/ExecutorAllocationManagerSuite.scala  | 24 +---
 4 files changed, 28 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4e9e6aee/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 5bc2d9e..2e00dc8 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -141,7 +141,11 @@ private[spark] class ExecutorAllocationManager(
   private val removeTimes = new mutable.HashMap[String, Long]
 
   // Polling loop interval (ms)
-  private val intervalMillis: Long = 100
+  private val intervalMillis: Long = if (Utils.isTesting) {
+  conf.getLong(TESTING_SCHEDULE_INTERVAL_KEY, 100)
+} else {
+  100
+}
 
   // Clock used to schedule when executors should be added and removed
   private var clock: Clock = new SystemClock()
@@ -856,4 +860,5 @@ private[spark] class ExecutorAllocationManager(
 
 private object ExecutorAllocationManager {
   val NOT_SET = Long.MaxValue
+  val TESTING_SCHEDULE_INTERVAL_KEY = 
"spark.testing.dynamicAllocation.scheduleInterval"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4e9e6aee/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index fcbeddd..1dbb378 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1575,10 +1575,10 @@ class SparkContext(config: SparkConf) extends Logging {
 
   private[spark] def getExecutorIds(): Seq[String] = {
 schedulerBackend match {
-  case b: CoarseGrainedSchedulerBackend =>
+  case b: ExecutorAllocationClient =>
 b.getExecutorIds()
   case _ =>
-logWarning("Requesting executors is only supported in coarse-grained 
mode")
+logWarning("Requesting executors is not supported by current 
scheduler.")
 Nil
 }
   }
@@ -1604,10 +1604,10 @@ class SparkContext(config: SparkConf) extends Logging {
   hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
 ): Boolean = {
 schedulerBackend match {
-  case b: CoarseGrainedSchedulerBackend =>
+  case b: ExecutorAllocationClient =>
 b.requestTotalExecutors(numExecutors, localityAwareTasks, 
hostToLocalTaskCount)
   case _ =>
-logWarning("Requesting executors is only supported in coarse-grained 
mode")
+logWarning("Requesting executors is not supported by current 
scheduler.")
 false
 }
   }
@@ -1620,10 +1620,10 @@ class SparkContext(config: SparkConf) extends Logging {
   @DeveloperApi
   def requestExecutors(numAdditionalExecutors: Int): Boolean = {
 schedulerBackend match {
-  case b: CoarseGrainedSchedulerBackend =>
+  case b: ExecutorAllocationClient =>
 b.requestExecutors(numAdditionalExecutors)
   case _ =>
-logWarning("Requesting executors is only supported in coarse-grained 
mode")
+logWarning("Requesting executors 

spark git commit: [SPARK-20654][CORE] Add config to limit disk usage of the history server.

2017-12-29 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/master 11a849b3a -> 8b497046c


[SPARK-20654][CORE] Add config to limit disk usage of the history server.

This change adds a new configuration option and support code that limits
how much disk space the SHS will use. The default value is pretty generous
so that applications will, hopefully, only rarely need to be replayed
because of their disk stored being evicted.

This works by keeping track of how much data each application is using.
Also, because it's not possible to know, before replaying, how much space
will be needed, it's possible that usage will exceed the configured limit
temporarily. The code uses the concept of a "lease" to try to limit how
much the SHS will exceed the limit in those cases.

Active UIs are also tracked, so they're never deleted. This works in
tandem with the existing option of how many active UIs are loaded; because
unused UIs will be unloaded, their disk stores will also become candidates
for deletion. If the data is not deleted, though, re-loading the UI is
pretty quick.

Author: Marcelo Vanzin 

Closes #20011 from vanzin/SPARK-20654.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b497046
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b497046
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b497046

Branch: refs/heads/master
Commit: 8b497046c647a21bbed1bdfbdcb176745a1d5cd5
Parents: 11a849b
Author: Marcelo Vanzin 
Authored: Fri Dec 29 10:40:09 2017 -0600
Committer: Imran Rashid 
Committed: Fri Dec 29 10:40:09 2017 -0600

--
 .../deploy/history/FsHistoryProvider.scala  | 173 ++
 .../history/HistoryServerDiskManager.scala  | 327 +++
 .../apache/spark/deploy/history/config.scala|   5 +
 .../spark/scheduler/EventLoggingListener.scala  |  15 +-
 .../history/HistoryServerDiskManagerSuite.scala | 160 +
 5 files changed, 606 insertions(+), 74 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8b497046/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index a299b79..94c80eb 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -24,6 +24,7 @@ import java.util.zip.{ZipEntry, ZipOutputStream}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.util.Try
 import scala.xml.Node
 
 import com.fasterxml.jackson.annotation.JsonIgnore
@@ -39,11 +40,13 @@ import org.fusesource.leveldbjni.internal.NativeDB
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
+import org.apache.spark.io.CompressionCodec
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.ReplayListenerBus._
 import org.apache.spark.status._
 import org.apache.spark.status.KVUtils._
 import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
+import org.apache.spark.status.config._
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
 import org.apache.spark.util.kvstore._
@@ -149,6 +152,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 }
   }.getOrElse(new InMemoryStore())
 
+  private val diskManager = storePath.map { path =>
+new HistoryServerDiskManager(conf, path, listing, clock)
+  }
+
   private val activeUIs = new mutable.HashMap[(String, Option[String]), 
LoadedAppUI]()
 
   /**
@@ -219,6 +226,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   }
 
   private def startPolling(): Unit = {
+diskManager.foreach(_.initialize())
+
 // Validate the log directory.
 val path = new Path(logDir)
 try {
@@ -299,63 +308,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   attempt.adminAclsGroups.getOrElse(""))
 secManager.setViewAclsGroups(attempt.viewAclsGroups.getOrElse(""))
 
-val uiStorePath = storePath.map { path => getStorePath(path, appId, 
attemptId) }
+val kvstore = try {
+  diskManager match {
+case Some(sm) =>
+  loadDiskStore(sm, appId, attempt)
 
-val (kvstore, needReplay) = uiStorePath match {
-  case Some(path) =>
-try {
-  // The store path is not guaranteed to exist - maybe it hasn't been 
created, or was
-  // invalidated because changes to the event log were detected. Need 
to replay in tha

svn commit: r23946 - in /dev/spark/2.3.0-SNAPSHOT-2017_12_29_08_01-11a849b-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2017-12-29 Thread pwendell
Author: pwendell
Date: Fri Dec 29 16:20:09 2017
New Revision: 23946

Log:
Apache Spark 2.3.0-SNAPSHOT-2017_12_29_08_01-11a849b docs


[This commit notification would consist of 1425 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22370][SQL][PYSPARK][FOLLOW-UP] Fix a test failure when xmlrunner is installed.

2017-12-29 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master dbd492b7e -> 11a849b3a


[SPARK-22370][SQL][PYSPARK][FOLLOW-UP] Fix a test failure when xmlrunner is 
installed.

## What changes were proposed in this pull request?

This is a follow-up pr of #19587.

If `xmlrunner` is installed, 
`VectorizedUDFTests.test_vectorized_udf_check_config` fails by the following 
error because the `self` which is a subclass of `unittest.TestCase` in the UDF 
`check_records_per_batch` can't be pickled anymore.

```
PicklingError: Cannot pickle files that are not opened for reading: w
```

This changes the UDF not to refer the `self`.

## How was this patch tested?

Tested locally.

Author: Takuya UESHIN 

Closes #20115 from ueshin/issues/SPARK-22370_fup1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/11a849b3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/11a849b3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/11a849b3

Branch: refs/heads/master
Commit: 11a849b3a7b3d03c48d3e17c8a721acedfd89285
Parents: dbd492b
Author: Takuya UESHIN 
Authored: Fri Dec 29 23:04:28 2017 +0900
Committer: hyukjinkwon 
Committed: Fri Dec 29 23:04:28 2017 +0900

--
 python/pyspark/sql/tests.py | 9 +
 1 file changed, 5 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/11a849b3/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 3ef1522..1c34c89 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -3825,6 +3825,7 @@ class VectorizedUDFTests(ReusedSQLTestCase):
 
 def test_vectorized_udf_check_config(self):
 from pyspark.sql.functions import pandas_udf, col
+import pandas as pd
 orig_value = 
self.spark.conf.get("spark.sql.execution.arrow.maxRecordsPerBatch", None)
 self.spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 3)
 try:
@@ -3832,11 +3833,11 @@ class VectorizedUDFTests(ReusedSQLTestCase):
 
 @pandas_udf(returnType=LongType())
 def check_records_per_batch(x):
-self.assertTrue(x.size <= 3)
-return x
+return pd.Series(x.size).repeat(x.size)
 
-result = df.select(check_records_per_batch(col("id")))
-self.assertEqual(df.collect(), result.collect())
+result = df.select(check_records_per_batch(col("id"))).collect()
+for (r,) in result:
+self.assertTrue(r <= 3)
 finally:
 if orig_value is None:
 
self.spark.conf.unset("spark.sql.execution.arrow.maxRecordsPerBatch")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22921][PROJECT-INFRA] Choices for Assigning Jira on Merge

2017-12-29 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master fcf66a327 -> dbd492b7e


[SPARK-22921][PROJECT-INFRA] Choices for Assigning Jira on Merge

In general jiras are assigned to the original reporter or one of
the commentors.  This updates the merge script to give you a simple
choice to do that, so you don't have to do it manually.

Author: Imran Rashid 

Closes #20107 from squito/SPARK-22921.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dbd492b7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dbd492b7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dbd492b7

Branch: refs/heads/master
Commit: dbd492b7e293f0bf4c13076ba21deb506c5f0969
Parents: fcf66a3
Author: Imran Rashid 
Authored: Fri Dec 29 07:30:49 2017 -0600
Committer: Sean Owen 
Committed: Fri Dec 29 07:30:49 2017 -0600

--
 dev/merge_spark_pr.py | 28 
 1 file changed, 28 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/dbd492b7/dev/merge_spark_pr.py
--
diff --git a/dev/merge_spark_pr.py b/dev/merge_spark_pr.py
index 28971b8..cd5dd1e 100755
--- a/dev/merge_spark_pr.py
+++ b/dev/merge_spark_pr.py
@@ -242,6 +242,9 @@ def resolve_jira_issue(merge_branches, comment, 
default_jira_id=""):
 cur_summary = issue.fields.summary
 cur_assignee = issue.fields.assignee
 if cur_assignee is None:
+cur_assignee = choose_jira_assignee(issue)
+# Check again, we might not have chose an assignee
+if cur_assignee is None:
 cur_assignee = "NOT ASSIGNED!!!"
 else:
 cur_assignee = cur_assignee.displayName
@@ -290,6 +293,31 @@ def resolve_jira_issue(merge_branches, comment, 
default_jira_id=""):
 print("Successfully resolved %s with fixVersions=%s!" % (jira_id, 
fix_versions))
 
 
+def choose_jira_assignee(issue, asf_jira):
+"""
+Prompt the user to choose who to assign the issue to in jira, given a list 
of candidates,
+including the original reporter and all commentors
+"""
+reporter = issue.fields.reporter
+commentors = map(lambda x: x.author, issue.fields.comment.comments)
+candidates = set(commentors)
+candidates.add(reporter)
+candidates = list(candidates)
+print("JIRA is unassigned, choose assignee")
+for idx, author in enumerate(candidates):
+annotations = ["Reporter"] if author == reporter else []
+if author in commentors:
+annotations.append("Commentor")
+print("[%d] %s (%s)" % (idx, author.displayName, 
",".join(annotations)))
+assignee = raw_input("Enter number of user to assign to (blank to leave 
unassigned):")
+if assignee == "":
+return None
+else:
+assignee = candidates[int(assignee)]
+asf_jira.assign_issue(issue.key, assignee.key)
+return assignee
+
+
 def resolve_jira_issues(title, merge_branches, comment):
 jira_ids = re.findall("SPARK-[0-9]{4,5}", title)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21657][SQL] optimize explode quadratic memory consumpation

2017-12-29 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master cc30ef800 -> fcf66a327


[SPARK-21657][SQL] optimize explode quadratic memory consumpation

## What changes were proposed in this pull request?

The issue has been raised in two Jira tickets: 
[SPARK-21657](https://issues.apache.org/jira/browse/SPARK-21657), 
[SPARK-16998](https://issues.apache.org/jira/browse/SPARK-16998). Basically, 
what happens is that in collection generators like explode/inline we create 
many rows from each row. Currently each exploded row contains also the column 
on which it was created. This causes, for example, if we have a 10k array in 
one row that this array will get copy 10k times - to each of the row. this 
results a qudratic memory consumption. However, it is a common case that the 
original column gets projected out after the explode, so we can avoid 
duplicating it.
In this solution we propose to identify this situation in the optimizer and 
turn on a flag for omitting the original column in the generation process.

## How was this patch tested?

1. We added a benchmark test to MiscBenchmark that shows x16 improvement in 
runtimes.
2. We ran some of the other tests in MiscBenchmark and they show 15% 
improvements.
3. We ran this code on a specific case from our production data with rows 
containing arrays of size ~200k and it reduced the runtime from 6 hours to 3 
mins.

Author: oraviv 
Author: uzadude 
Author: uzadude <15645757+uzad...@users.noreply.github.com>

Closes #19683 from uzadude/optimize_explode.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fcf66a32
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fcf66a32
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fcf66a32

Branch: refs/heads/master
Commit: fcf66a32760c74e601acb537c51b2311ece6e9d5
Parents: cc30ef8
Author: oraviv 
Authored: Fri Dec 29 21:08:34 2017 +0800
Committer: Wenchen Fan 
Committed: Fri Dec 29 21:08:34 2017 +0800

--
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  6 +--
 .../sql/catalyst/analysis/CheckAnalysis.scala   |  4 +-
 .../apache/spark/sql/catalyst/dsl/package.scala |  6 +--
 .../sql/catalyst/optimizer/Optimizer.scala  | 13 +++---
 .../spark/sql/catalyst/parser/AstBuilder.scala  |  2 +-
 .../plans/logical/basicLogicalOperators.scala   | 21 ++
 .../catalyst/optimizer/ColumnPruningSuite.scala | 44 
 .../optimizer/FilterPushdownSuite.scala | 14 +++
 .../sql/catalyst/parser/PlanParserSuite.scala   | 16 ---
 .../scala/org/apache/spark/sql/Dataset.scala|  4 +-
 .../spark/sql/execution/GenerateExec.scala  | 29 ++---
 .../spark/sql/execution/SparkStrategies.scala   |  6 +--
 .../sql/execution/benchmark/MiscBenchmark.scala | 37 
 13 files changed, 128 insertions(+), 74 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fcf66a32/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 7f2128e..1f7191c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -696,7 +696,7 @@ class Analyzer(
   (oldVersion, oldVersion.copy(aggregateExpressions = 
newAliases(aggregateExpressions)))
 
 case oldVersion: Generate
-if 
oldVersion.generatedSet.intersect(conflictingAttributes).nonEmpty =>
+if 
oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty =>
   val newOutput = oldVersion.generatorOutput.map(_.newInstance())
   (oldVersion, oldVersion.copy(generatorOutput = newOutput))
 
@@ -1138,7 +1138,7 @@ class Analyzer(
   case g: Generate =>
 val maybeResolvedExprs = exprs.map(resolveExpression(_, g))
 val (newExprs, newChild) = 
resolveExprsAndAddMissingAttrs(maybeResolvedExprs, g.child)
-(newExprs, g.copy(join = true, child = newChild))
+(newExprs, g.copy(unrequiredChildIndex = Nil, child = newChild))
 
   // For `Distinct` and `SubqueryAlias`, we can't recursively resolve 
and add attributes
   // via its children.
@@ -1578,7 +1578,7 @@ class Analyzer(
 resolvedGenerator =
   Generate(
 generator,
-join = projectList.size > 1, // Only join if there are other 
expressions in SELECT.
+unrequiredChildIndex = Nil,
 outer = outer,
 qualifier = None,
 generatorOutput = 
Re

svn commit: r23944 - in /dev/spark/2.3.0-SNAPSHOT-2017_12_29_04_01-cc30ef8-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2017-12-29 Thread pwendell
Author: pwendell
Date: Fri Dec 29 12:19:07 2017
New Revision: 23944

Log:
Apache Spark 2.3.0-SNAPSHOT-2017_12_29_04_01-cc30ef8 docs


[This commit notification would consist of 1425 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22916][SQL] shouldn't bias towards build right if user does not specify

2017-12-29 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 224375c55 -> cc30ef800


[SPARK-22916][SQL] shouldn't bias towards build right if user does not specify

## What changes were proposed in this pull request?

When there are no broadcast hints, the current spark strategies will prefer to 
building the right side, without considering the sizes of the two tables. This 
patch added the logic to consider the sizes of the two tables for the build 
side. To make the logic clear, the build side is determined by two steps:

1. If there are broadcast hints, the build side is determined by 
`broadcastSideByHints`;
2. If there are no broadcast hints, the build side is determined by 
`broadcastSideBySizes`;
3. If the broadcast is disabled by the config, it falls back to the next cases.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, 
manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Feng Liu 

Closes #20099 from liufengdb/fix-spark-strategies.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc30ef80
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc30ef80
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc30ef80

Branch: refs/heads/master
Commit: cc30ef8009b82c71a4b8e9caba82ed141761ab85
Parents: 224375c
Author: Feng Liu 
Authored: Fri Dec 29 18:48:47 2017 +0800
Committer: gatorsmile 
Committed: Fri Dec 29 18:48:47 2017 +0800

--
 .../spark/sql/execution/SparkStrategies.scala   | 75 
 .../execution/joins/BroadcastJoinSuite.scala| 75 +++-
 .../sql/execution/metric/SQLMetricsSuite.scala  | 15 ++--
 3 files changed, 116 insertions(+), 49 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cc30ef80/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 6b3f301..0ed7c2f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -158,45 +158,65 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   def smallerSide =
 if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else 
BuildLeft
 
-  val buildRight = canBuildRight && right.stats.hints.broadcast
-  val buildLeft = canBuildLeft && left.stats.hints.broadcast
-
-  if (buildRight && buildLeft) {
+  if (canBuildRight && canBuildLeft) {
 // Broadcast smaller side base on its estimated physical size
 // if both sides have broadcast hint
 smallerSide
-  } else if (buildRight) {
+  } else if (canBuildRight) {
 BuildRight
-  } else if (buildLeft) {
+  } else if (canBuildLeft) {
 BuildLeft
-  } else if (canBuildRight && canBuildLeft) {
+  } else {
 // for the last default broadcast nested loop join
 smallerSide
-  } else {
-throw new AnalysisException("Can not decide which side to broadcast 
for this join")
   }
 }
 
+private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan, 
right: LogicalPlan)
+  : Boolean = {
+  val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
+  val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
+  buildLeft || buildRight
+}
+
+private def broadcastSideByHints(joinType: JoinType, left: LogicalPlan, 
right: LogicalPlan)
+  : BuildSide = {
+  val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
+  val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
+  broadcastSide(buildLeft, buildRight, left, right)
+}
+
+private def canBroadcastBySizes(joinType: JoinType, left: LogicalPlan, 
right: LogicalPlan)
+  : Boolean = {
+  val buildLeft = canBuildLeft(joinType) && canBroadcast(left)
+  val buildRight = canBuildRight(joinType) && canBroadcast(right)
+  buildLeft || buildRight
+}
+
+private def broadcastSideBySizes(joinType: JoinType, left: LogicalPlan, 
right: LogicalPlan)
+  : BuildSide = {
+  val buildLeft = canBuildLeft(joinType) && canBroadcast(left)
+  val buildRight = canBuildRight(joinType) && canBroadcast(right)
+  broadcastSide(buildLeft, buildRight, left, right)
+}
+
 def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
 
   // --- Broadc

svn commit: r23943 - in /dev/spark/2.3.0-SNAPSHOT-2017_12_29_00_01-224375c-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2017-12-29 Thread pwendell
Author: pwendell
Date: Fri Dec 29 08:15:45 2017
New Revision: 23943

Log:
Apache Spark 2.3.0-SNAPSHOT-2017_12_29_00_01-224375c docs


[This commit notification would consist of 1425 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org