svn commit: r29697 - in /dev/spark/2.4.1-SNAPSHOT-2018_09_25_22_02-3f20305-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-09-25 Thread pwendell
Author: pwendell
Date: Wed Sep 26 05:17:17 2018
New Revision: 29697

Log:
Apache Spark 2.4.1-SNAPSHOT-2018_09_25_22_02-3f20305 docs


[This commit notification would consist of 1472 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[1/2] spark git commit: [MINOR][PYTHON] Use a helper in `PythonUtils` instead of direct accessing Scala package

2018-09-25 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 4f10aff40 -> ef3616825


[MINOR][PYTHON] Use a helper in `PythonUtils` instead of direct accessing Scala 
package

## What changes were proposed in this pull request?

This PR proposes to use add a helper in `PythonUtils` instead of direct 
accessing Scala package.

## How was this patch tested?

Jenkins tests.

Closes #22483 from HyukjinKwon/minor-refactoring.

Authored-by: hyukjinkwon 
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ad6693b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ad6693b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ad6693b

Branch: refs/heads/branch-2.2
Commit: 8ad6693bd27f3e130fbd51518de880997a1cdcc7
Parents: 4f10aff
Author: hyukjinkwon 
Authored: Fri Sep 21 00:41:42 2018 +0800
Committer: hyukjinkwon 
Committed: Wed Sep 26 10:50:38 2018 +0800

--
 .../src/main/scala/org/apache/spark/api/python/PythonUtils.scala | 4 
 python/pyspark/context.py| 4 +---
 2 files changed, 5 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8ad6693b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index 27a5e19..cdce371 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -74,4 +74,8 @@ private[spark] object PythonUtils {
   def toScalaMap[K, V](jm: java.util.Map[K, V]): Map[K, V] = {
 jm.asScala.toMap
   }
+
+  def getEncryptionEnabled(sc: JavaSparkContext): Boolean = {
+sc.conf.get(org.apache.spark.internal.config.IO_ENCRYPTION_ENABLED)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ad6693b/python/pyspark/context.py
--
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 68e4c17..171e143 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -192,9 +192,7 @@ class SparkContext(object):
 # If encryption is enabled, we need to setup a server in the jvm to 
read broadcast
 # data via a socket.
 # scala's mangled names w/ $ in them require special treatment.
-encryption_conf = 
self._jvm.org.apache.spark.internal.config.__getattr__("package$")\
-.__getattr__("MODULE$").IO_ENCRYPTION_ENABLED()
-self._encryption_enabled = self._jsc.sc().conf().get(encryption_conf)
+self._encryption_enabled = 
self._jvm.PythonUtils.getEncryptionEnabled(self._jsc)
 
 self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python')
 self.pythonVer = "%d.%d" % sys.version_info[:2]


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[2/2] spark git commit: [SPARKR] Match pyspark features in SparkR communication protocol

2018-09-25 Thread gurwls223
[SPARKR] Match pyspark features in SparkR communication protocol


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef361682
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef361682
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef361682

Branch: refs/heads/branch-2.2
Commit: ef36168258b8ad15362312e0562794f4f07322d0
Parents: 8ad6693
Author: hyukjinkwon 
Authored: Mon Sep 24 19:25:02 2018 +0800
Committer: hyukjinkwon 
Committed: Wed Sep 26 10:50:46 2018 +0800

--
 R/pkg/R/context.R   | 43 ++--
 R/pkg/tests/fulltests/test_Serde.R  | 32 +++
 R/pkg/tests/fulltests/test_sparkSQL.R   | 12 --
 .../scala/org/apache/spark/api/r/RRDD.scala | 33 ++-
 .../scala/org/apache/spark/api/r/RUtils.scala   |  4 ++
 5 files changed, 98 insertions(+), 26 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ef361682/R/pkg/R/context.R
--
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 50856e3..c1a12f5 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -168,18 +168,30 @@ parallelize <- function(sc, coll, numSlices = 1) {
   # 2-tuples of raws
   serializedSlices <- lapply(slices, serialize, connection = NULL)
 
-  # The PRC backend cannot handle arguments larger than 2GB (INT_MAX)
+  # The RPC backend cannot handle arguments larger than 2GB (INT_MAX)
   # If serialized data is safely less than that threshold we send it over the 
PRC channel.
   # Otherwise, we write it to a file and send the file name
   if (objectSize < sizeLimit) {
 jrdd <- callJStatic("org.apache.spark.api.r.RRDD", "createRDDFromArray", 
sc, serializedSlices)
   } else {
-fileName <- writeToTempFile(serializedSlices)
-jrdd <- tryCatch(callJStatic(
-"org.apache.spark.api.r.RRDD", "createRDDFromFile", sc, fileName, 
as.integer(numSlices)),
-  finally = {
-file.remove(fileName)
-})
+if (callJStatic("org.apache.spark.api.r.RUtils", "getEncryptionEnabled", 
sc)) {
+  # the length of slices here is the parallelism to use in the jvm's 
sc.parallelize()
+  parallelism <- as.integer(numSlices)
+  jserver <- newJObject("org.apache.spark.api.r.RParallelizeServer", sc, 
parallelism)
+  authSecret <- callJMethod(jserver, "secret")
+  port <- callJMethod(jserver, "port")
+  conn <- socketConnection(port = port, blocking = TRUE, open = "wb", 
timeout = 1500)
+  doServerAuth(conn, authSecret)
+  writeToConnection(serializedSlices, conn)
+  jrdd <- callJMethod(jserver, "getResult")
+} else {
+  fileName <- writeToTempFile(serializedSlices)
+  jrdd <- tryCatch(callJStatic(
+  "org.apache.spark.api.r.RRDD", "createRDDFromFile", sc, fileName, 
as.integer(numSlices)),
+finally = {
+  file.remove(fileName)
+  })
+}
   }
 
   RDD(jrdd, "byte")
@@ -195,14 +207,21 @@ getMaxAllocationLimit <- function(sc) {
   ))
 }
 
+writeToConnection <- function(serializedSlices, conn) {
+  tryCatch({
+for (slice in serializedSlices) {
+  writeBin(as.integer(length(slice)), conn, endian = "big")
+  writeBin(slice, conn, endian = "big")
+}
+  }, finally = {
+close(conn)
+  })
+}
+
 writeToTempFile <- function(serializedSlices) {
   fileName <- tempfile()
   conn <- file(fileName, "wb")
-  for (slice in serializedSlices) {
-writeBin(as.integer(length(slice)), conn, endian = "big")
-writeBin(slice, conn, endian = "big")
-  }
-  close(conn)
+  writeToConnection(serializedSlices, conn)
   fileName
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ef361682/R/pkg/tests/fulltests/test_Serde.R
--
diff --git a/R/pkg/tests/fulltests/test_Serde.R 
b/R/pkg/tests/fulltests/test_Serde.R
index 6bbd201..092f9b8 100644
--- a/R/pkg/tests/fulltests/test_Serde.R
+++ b/R/pkg/tests/fulltests/test_Serde.R
@@ -77,3 +77,35 @@ test_that("SerDe of list of lists", {
 })
 
 sparkR.session.stop()
+
+# Note that this test should be at the end of tests since the configruations 
used here are not
+# specific to sessions, and the Spark context is restarted.
+test_that("createDataFrame large objects", {
+  for (encryptionEnabled in list("true", "false")) {
+# To simulate a large object scenario, we set spark.r.maxAllocationLimit 
to a smaller value
+conf <- list(spark.r.maxAllocationLimit = "100",
+ spark.io.encryption.enabled = encryptionEnabled)
+
+suppressWarnings(sparkR.session(master = sparkRTestMaster,
+sparkConfig = conf,
+enableHiveSupport = FALSE))
+
+sc <- getSparkContext()
+actual <- 

svn commit: r29683 - in /dev/spark/2.5.0-SNAPSHOT-2018_09_25_20_02-473d0d8-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-09-25 Thread pwendell
Author: pwendell
Date: Wed Sep 26 03:17:32 2018
New Revision: 29683

Log:
Apache Spark 2.5.0-SNAPSHOT-2018_09_25_20_02-473d0d8 docs


[This commit notification would consist of 1485 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25514][SQL] Generating pretty JSON by to_json

2018-09-25 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master cb77a6689 -> 473d0d862


[SPARK-25514][SQL] Generating pretty JSON by to_json

## What changes were proposed in this pull request?

The PR introduces new JSON option `pretty` which allows to turn on 
`DefaultPrettyPrinter` of `Jackson`'s Json generator. New option is useful in 
exploring of deep nested columns and in converting of JSON columns in more 
readable representation (look at the added test).

## How was this patch tested?

Added rount trip test which convert an JSON string to pretty representation via 
`from_json()` and `to_json()`.

Closes #22534 from MaxGekk/pretty-json.

Lead-authored-by: Maxim Gekk 
Co-authored-by: Maxim Gekk 
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/473d0d86
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/473d0d86
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/473d0d86

Branch: refs/heads/master
Commit: 473d0d862de54ec1c7a8f0354fa5e06f3d66e455
Parents: cb77a66
Author: Maxim Gekk 
Authored: Wed Sep 26 09:52:15 2018 +0800
Committer: hyukjinkwon 
Committed: Wed Sep 26 09:52:15 2018 +0800

--
 R/pkg/R/functions.R |  5 +++--
 python/pyspark/sql/functions.py |  4 +++-
 .../spark/sql/catalyst/json/JSONOptions.scala   |  5 +
 .../sql/catalyst/json/JacksonGenerator.scala|  5 -
 .../scala/org/apache/spark/sql/functions.scala  |  4 
 .../apache/spark/sql/JsonFunctionsSuite.scala   | 21 
 6 files changed, 40 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/473d0d86/R/pkg/R/functions.R
--
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index 572dee5..6425c9d 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -198,8 +198,9 @@ NULL
 #'  }
 #' @param ... additional argument(s). In \code{to_json} and \code{from_json}, 
this contains
 #'additional named properties to control how it is converted, 
accepts the same
-#'options as the JSON data source.  In \code{arrays_zip}, this 
contains additional
-#'Columns of arrays to be merged.
+#'options as the JSON data source. Additionally \code{to_json} 
supports the "pretty"
+#'option which enables pretty JSON generation. In 
\code{arrays_zip}, this contains
+#'additional Columns of arrays to be merged.
 #' @name column_collection_functions
 #' @rdname column_collection_functions
 #' @family collection functions

http://git-wip-us.apache.org/repos/asf/spark/blob/473d0d86/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 6da5237..1c3d972 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2295,7 +2295,9 @@ def to_json(col, options={}):
 into a JSON string. Throws an exception, in the case of an unsupported 
type.
 
 :param col: name of column containing a struct, an array or a map.
-:param options: options to control converting. accepts the same options as 
the JSON datasource
+:param options: options to control converting. accepts the same options as 
the JSON datasource.
+Additionally the function supports the `pretty` option 
which enables
+pretty JSON generation.
 
 >>> from pyspark.sql import Row
 >>> from pyspark.sql.types import *

http://git-wip-us.apache.org/repos/asf/spark/blob/473d0d86/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 47eeb70..64152e0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -113,6 +113,11 @@ private[sql] class JSONOptions(
   }
   val lineSeparatorInWrite: String = lineSeparator.getOrElse("\n")
 
+  /**
+   * Generating JSON strings in pretty representation if the parameter is 
enabled.
+   */
+  val pretty: Boolean = 
parameters.get("pretty").map(_.toBoolean).getOrElse(false)
+
   /** Sets config options on a Jackson [[JsonFactory]]. */
   def setJacksonOptions(factory: JsonFactory): Unit = {
 factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments)


spark git commit: [SPARK-21291][R] add R partitionBy API in DataFrame

2018-09-25 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 8c2edf46d -> cb77a6689


[SPARK-21291][R] add R partitionBy API in DataFrame

## What changes were proposed in this pull request?

add R partitionBy API in write.df
I didn't add bucketBy in write.df. The last line of write.df is
```
write <- handledCallJMethod(write, "save")
```
save doesn't support bucketBy right now.
```
 assertNotBucketed("save")
```

## How was this patch tested?

Add unit test in test_sparkSQL.R

Closes #22537 from huaxingao/spark-21291.

Authored-by: Huaxin Gao 
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb77a668
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb77a668
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb77a668

Branch: refs/heads/master
Commit: cb77a6689137916e64bc5692b0c942e86ca1a0ea
Parents: 8c2edf4
Author: Huaxin Gao 
Authored: Wed Sep 26 09:37:44 2018 +0800
Committer: hyukjinkwon 
Committed: Wed Sep 26 09:37:44 2018 +0800

--
 R/pkg/R/DataFrame.R   | 17 +++--
 R/pkg/tests/fulltests/test_sparkSQL.R |  8 
 2 files changed, 23 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cb77a668/R/pkg/R/DataFrame.R
--
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index a1cb478..3469188 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -2954,6 +2954,9 @@ setMethod("exceptAll",
 #' @param source a name for external data source.
 #' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
 #' save mode (it is 'error' by default)
+#' @param partitionBy a name or a list of names of columns to partition the 
output by on the file
+#'system. If specified, the output is laid out on the file 
system similar
+#'to Hive's partitioning scheme.
 #' @param ... additional argument(s) passed to the method.
 #'
 #' @family SparkDataFrame functions
@@ -2965,13 +2968,13 @@ setMethod("exceptAll",
 #' sparkR.session()
 #' path <- "path/to/file.json"
 #' df <- read.json(path)
-#' write.df(df, "myfile", "parquet", "overwrite")
+#' write.df(df, "myfile", "parquet", "overwrite", partitionBy = c("col1", 
"col2"))
 #' saveDF(df, parquetPath2, "parquet", mode = "append", mergeSchema = TRUE)
 #' }
 #' @note write.df since 1.4.0
 setMethod("write.df",
   signature(df = "SparkDataFrame"),
-  function(df, path = NULL, source = NULL, mode = "error", ...) {
+  function(df, path = NULL, source = NULL, mode = "error", partitionBy 
= NULL, ...) {
 if (!is.null(path) && !is.character(path)) {
   stop("path should be character, NULL or omitted.")
 }
@@ -2985,8 +2988,18 @@ setMethod("write.df",
 if (is.null(source)) {
   source <- getDefaultSqlSource()
 }
+cols <- NULL
+if (!is.null(partitionBy)) {
+  if (!all(sapply(partitionBy, function(c) is.character(c {
+stop("All partitionBy column names should be characters.")
+  }
+  cols <- as.list(partitionBy)
+}
 write <- callJMethod(df@sdf, "write")
 write <- callJMethod(write, "format", source)
+if (!is.null(cols)) {
+  write <- callJMethod(write, "partitionBy", cols)
+}
 write <- setWriteOptions(write, path = path, mode = mode, ...)
 write <- handledCallJMethod(write, "save")
   })

http://git-wip-us.apache.org/repos/asf/spark/blob/cb77a668/R/pkg/tests/fulltests/test_sparkSQL.R
--
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
index a874bfb..50eff37 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -2701,8 +2701,16 @@ test_that("read/write text files", {
   expect_equal(colnames(df2), c("value"))
   expect_equal(count(df2), count(df) * 2)
 
+  df3 <- createDataFrame(list(list(1L, "1"), list(2L, "2"), list(1L, "1"), 
list(2L, "2")),
+ schema = c("key", "value"))
+  textPath3 <- tempfile(pattern = "textPath3", fileext = ".txt")
+  write.df(df3, textPath3, "text", mode = "overwrite", partitionBy = "key")
+  df4 <- read.df(textPath3, "text")
+  expect_equal(count(df3), count(df4))
+
   unlink(textPath)
   unlink(textPath2)
+  unlink(textPath3)
 })
 
 test_that("read/write text files - compression option", {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: 

spark git commit: [SPARK-24324][PYTHON][FOLLOW-UP] Rename the Conf to spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName

2018-09-25 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/branch-2.4 f91247f81 -> 3f203050a


[SPARK-24324][PYTHON][FOLLOW-UP] Rename the Conf to 
spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName

## What changes were proposed in this pull request?

Add the legacy prefix for 
spark.sql.execution.pandas.groupedMap.assignColumnsByPosition and rename it to 
spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName

## How was this patch tested?
The existing tests.

Closes #22540 from gatorsmile/renameAssignColumnsByPosition.

Lead-authored-by: gatorsmile 
Co-authored-by: Hyukjin Kwon 
Signed-off-by: hyukjinkwon 
(cherry picked from commit 8c2edf46d0f89e5ec54968218d89f30a3f8190bc)
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f203050
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f203050
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f203050

Branch: refs/heads/branch-2.4
Commit: 3f203050ac764516e68fb43628bba0df5963e44d
Parents: f91247f
Author: gatorsmile 
Authored: Wed Sep 26 09:32:51 2018 +0800
Committer: hyukjinkwon 
Committed: Wed Sep 26 09:33:13 2018 +0800

--
 python/pyspark/sql/tests.py   |  3 ++-
 python/pyspark/worker.py  |  7 ---
 .../org/apache/spark/sql/internal/SQLConf.scala   | 18 +-
 .../spark/sql/execution/arrow/ArrowUtils.scala|  9 +++--
 4 files changed, 18 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3f203050/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 9fa1577..cb186de 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -5799,7 +5799,8 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
 import pandas as pd
 from pyspark.sql.functions import pandas_udf, PandasUDFType
 
-with 
self.sql_conf({"spark.sql.execution.pandas.groupedMap.assignColumnsByPosition": 
True}):
+with self.sql_conf({
+
"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName": False}):
 
 @pandas_udf("a string, b float", PandasUDFType.GROUPED_MAP)
 def foo(_):

http://git-wip-us.apache.org/repos/asf/spark/blob/3f203050/python/pyspark/worker.py
--
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 974344f..8c59f1f 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -97,8 +97,9 @@ def wrap_scalar_pandas_udf(f, return_type):
 
 
 def wrap_grouped_map_pandas_udf(f, return_type, argspec, runner_conf):
-assign_cols_by_pos = runner_conf.get(
-"spark.sql.execution.pandas.groupedMap.assignColumnsByPosition", False)
+assign_cols_by_name = runner_conf.get(
+"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName", 
"true")
+assign_cols_by_name = assign_cols_by_name.lower() == "true"
 
 def wrapped(key_series, value_series):
 import pandas as pd
@@ -119,7 +120,7 @@ def wrap_grouped_map_pandas_udf(f, return_type, argspec, 
runner_conf):
 "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns)))
 
 # Assign result columns by schema name if user labeled with strings, 
else use position
-if not assign_cols_by_pos and any(isinstance(name, basestring) for 
name in result.columns):
+if assign_cols_by_name and any(isinstance(name, basestring) for name 
in result.columns):
 return [(result[field.name], to_arrow_type(field.dataType)) for 
field in return_type]
 else:
 return [(result[result.columns[i]], to_arrow_type(field.dataType))

http://git-wip-us.apache.org/repos/asf/spark/blob/3f203050/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 2788402..68daf9d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1275,15 +1275,15 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
-  val PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION =
-buildConf("spark.sql.execution.pandas.groupedMap.assignColumnsByPosition")
+  val PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_NAME =
+
buildConf("spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName")
   .internal()
-  .doc("When true, a grouped map 

spark git commit: [SPARK-24324][PYTHON][FOLLOW-UP] Rename the Conf to spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName

2018-09-25 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 9bb3a0c67 -> 8c2edf46d


[SPARK-24324][PYTHON][FOLLOW-UP] Rename the Conf to 
spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName

## What changes were proposed in this pull request?

Add the legacy prefix for 
spark.sql.execution.pandas.groupedMap.assignColumnsByPosition and rename it to 
spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName

## How was this patch tested?
The existing tests.

Closes #22540 from gatorsmile/renameAssignColumnsByPosition.

Lead-authored-by: gatorsmile 
Co-authored-by: Hyukjin Kwon 
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8c2edf46
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8c2edf46
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8c2edf46

Branch: refs/heads/master
Commit: 8c2edf46d0f89e5ec54968218d89f30a3f8190bc
Parents: 9bb3a0c
Author: gatorsmile 
Authored: Wed Sep 26 09:32:51 2018 +0800
Committer: hyukjinkwon 
Committed: Wed Sep 26 09:32:51 2018 +0800

--
 python/pyspark/sql/tests.py   |  3 ++-
 python/pyspark/worker.py  |  7 ---
 .../org/apache/spark/sql/internal/SQLConf.scala   | 18 +-
 .../spark/sql/execution/arrow/ArrowUtils.scala|  9 +++--
 4 files changed, 18 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8c2edf46/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index b829bae..74642d4 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -5802,7 +5802,8 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
 import pandas as pd
 from pyspark.sql.functions import pandas_udf, PandasUDFType
 
-with 
self.sql_conf({"spark.sql.execution.pandas.groupedMap.assignColumnsByPosition": 
True}):
+with self.sql_conf({
+
"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName": False}):
 
 @pandas_udf("a string, b float", PandasUDFType.GROUPED_MAP)
 def foo(_):

http://git-wip-us.apache.org/repos/asf/spark/blob/8c2edf46/python/pyspark/worker.py
--
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 974344f..8c59f1f 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -97,8 +97,9 @@ def wrap_scalar_pandas_udf(f, return_type):
 
 
 def wrap_grouped_map_pandas_udf(f, return_type, argspec, runner_conf):
-assign_cols_by_pos = runner_conf.get(
-"spark.sql.execution.pandas.groupedMap.assignColumnsByPosition", False)
+assign_cols_by_name = runner_conf.get(
+"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName", 
"true")
+assign_cols_by_name = assign_cols_by_name.lower() == "true"
 
 def wrapped(key_series, value_series):
 import pandas as pd
@@ -119,7 +120,7 @@ def wrap_grouped_map_pandas_udf(f, return_type, argspec, 
runner_conf):
 "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns)))
 
 # Assign result columns by schema name if user labeled with strings, 
else use position
-if not assign_cols_by_pos and any(isinstance(name, basestring) for 
name in result.columns):
+if assign_cols_by_name and any(isinstance(name, basestring) for name 
in result.columns):
 return [(result[field.name], to_arrow_type(field.dataType)) for 
field in return_type]
 else:
 return [(result[result.columns[i]], to_arrow_type(field.dataType))

http://git-wip-us.apache.org/repos/asf/spark/blob/8c2edf46/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 0e0a01d..e7c9a83 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1295,15 +1295,15 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
-  val PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION =
-buildConf("spark.sql.execution.pandas.groupedMap.assignColumnsByPosition")
+  val PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_NAME =
+
buildConf("spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName")
   .internal()
-  .doc("When true, a grouped map Pandas UDF will assign columns from the 
returned " +
-"Pandas DataFrame based on position, 

svn commit: r29678 - in /dev/spark/2.4.1-SNAPSHOT-2018_09_25_18_02-f91247f-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-09-25 Thread pwendell
Author: pwendell
Date: Wed Sep 26 01:16:56 2018
New Revision: 29678

Log:
Apache Spark 2.4.1-SNAPSHOT-2018_09_25_18_02-f91247f docs


[This commit notification would consist of 1472 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25422][CORE] Don't memory map blocks streamed to disk.

2018-09-25 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.4 544f86a69 -> f91247f81


[SPARK-25422][CORE] Don't memory map blocks streamed to disk.

After data has been streamed to disk, the buffers are inserted into the
memory store in some cases (eg., with broadcast blocks).  But broadcast
code also disposes of those buffers when the data has been read, to
ensure that we don't leave mapped buffers using up memory, which then
leads to garbage data in the memory store.

## How was this patch tested?

Ran the old failing test in a loop. Full tests on jenkins

Closes #22546 from squito/SPARK-25422-master.

Authored-by: Imran Rashid 
Signed-off-by: Wenchen Fan 
(cherry picked from commit 9bb3a0c67bd851b09ff4701ef1d280e2a77d791b)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f91247f8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f91247f8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f91247f8

Branch: refs/heads/branch-2.4
Commit: f91247f812f87daa9fe4ec23b100f2310254df22
Parents: 544f86a
Author: Imran Rashid 
Authored: Wed Sep 26 08:45:27 2018 +0800
Committer: Wenchen Fan 
Committed: Wed Sep 26 08:45:56 2018 +0800

--
 .../org/apache/spark/storage/BlockManager.scala | 13 +++---
 .../spark/util/io/ChunkedByteBuffer.scala   | 47 +++-
 2 files changed, 31 insertions(+), 29 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f91247f8/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 2234146..0fe82ac 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -438,10 +438,8 @@ private[spark] class BlockManager(
 // stream.
 channel.close()
 // TODO SPARK-25035 Even if we're only going to write the data to disk 
after this, we end up
-// using a lot of memory here.  With encryption, we'll read the whole 
file into a regular
-// byte buffer and OOM.  Without encryption, we'll memory map the file 
and won't get a jvm
-// OOM, but might get killed by the OS / cluster manager.  We could at 
least read the tmp
-// file as a stream in both cases.
+// using a lot of memory here. We'll read the whole file into a regular
+// byte buffer and OOM.  We could at least read the tmp file as a 
stream.
 val buffer = securityManager.getIOEncryptionKey() match {
   case Some(key) =>
 // we need to pass in the size of the unencrypted block
@@ -453,7 +451,7 @@ private[spark] class BlockManager(
 new EncryptedBlockData(tmpFile, blockSize, conf, 
key).toChunkedByteBuffer(allocator)
 
   case None =>
-ChunkedByteBuffer.map(tmpFile, 
conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt)
+ChunkedByteBuffer.fromFile(tmpFile, 
conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt)
 }
 putBytes(blockId, buffer, level)(classTag)
 tmpFile.delete()
@@ -726,10 +724,9 @@ private[spark] class BlockManager(
*/
   def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
 // TODO if we change this method to return the ManagedBuffer, then 
getRemoteValues
-// could just use the inputStream on the temp file, rather than 
memory-mapping the file.
+// could just use the inputStream on the temp file, rather than reading 
the file into memory.
 // Until then, replication can cause the process to use too much memory 
and get killed
-// by the OS / cluster manager (not a java OOM, since it's a memory-mapped 
file) even though
-// we've read the data to disk.
+// even though we've read the data to disk.
 logDebug(s"Getting remote block $blockId")
 require(blockId != null, "BlockId is null")
 var runningFailureCount = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/f91247f8/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala 
b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index 39f050f..4aa8d45 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -19,17 +19,16 @@ package org.apache.spark.util.io
 
 import java.io.{File, FileInputStream, InputStream}
 import java.nio.ByteBuffer
-import java.nio.channels.{FileChannel, WritableByteChannel}

spark git commit: [SPARK-25422][CORE] Don't memory map blocks streamed to disk.

2018-09-25 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 66d29870c -> 9bb3a0c67


[SPARK-25422][CORE] Don't memory map blocks streamed to disk.

After data has been streamed to disk, the buffers are inserted into the
memory store in some cases (eg., with broadcast blocks).  But broadcast
code also disposes of those buffers when the data has been read, to
ensure that we don't leave mapped buffers using up memory, which then
leads to garbage data in the memory store.

## How was this patch tested?

Ran the old failing test in a loop. Full tests on jenkins

Closes #22546 from squito/SPARK-25422-master.

Authored-by: Imran Rashid 
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9bb3a0c6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9bb3a0c6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9bb3a0c6

Branch: refs/heads/master
Commit: 9bb3a0c67bd851b09ff4701ef1d280e2a77d791b
Parents: 66d2987
Author: Imran Rashid 
Authored: Wed Sep 26 08:45:27 2018 +0800
Committer: Wenchen Fan 
Committed: Wed Sep 26 08:45:27 2018 +0800

--
 .../org/apache/spark/storage/BlockManager.scala | 13 +++---
 .../spark/util/io/ChunkedByteBuffer.scala   | 47 +++-
 2 files changed, 31 insertions(+), 29 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9bb3a0c6/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 2234146..0fe82ac 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -438,10 +438,8 @@ private[spark] class BlockManager(
 // stream.
 channel.close()
 // TODO SPARK-25035 Even if we're only going to write the data to disk 
after this, we end up
-// using a lot of memory here.  With encryption, we'll read the whole 
file into a regular
-// byte buffer and OOM.  Without encryption, we'll memory map the file 
and won't get a jvm
-// OOM, but might get killed by the OS / cluster manager.  We could at 
least read the tmp
-// file as a stream in both cases.
+// using a lot of memory here. We'll read the whole file into a regular
+// byte buffer and OOM.  We could at least read the tmp file as a 
stream.
 val buffer = securityManager.getIOEncryptionKey() match {
   case Some(key) =>
 // we need to pass in the size of the unencrypted block
@@ -453,7 +451,7 @@ private[spark] class BlockManager(
 new EncryptedBlockData(tmpFile, blockSize, conf, 
key).toChunkedByteBuffer(allocator)
 
   case None =>
-ChunkedByteBuffer.map(tmpFile, 
conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt)
+ChunkedByteBuffer.fromFile(tmpFile, 
conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt)
 }
 putBytes(blockId, buffer, level)(classTag)
 tmpFile.delete()
@@ -726,10 +724,9 @@ private[spark] class BlockManager(
*/
   def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
 // TODO if we change this method to return the ManagedBuffer, then 
getRemoteValues
-// could just use the inputStream on the temp file, rather than 
memory-mapping the file.
+// could just use the inputStream on the temp file, rather than reading 
the file into memory.
 // Until then, replication can cause the process to use too much memory 
and get killed
-// by the OS / cluster manager (not a java OOM, since it's a memory-mapped 
file) even though
-// we've read the data to disk.
+// even though we've read the data to disk.
 logDebug(s"Getting remote block $blockId")
 require(blockId != null, "BlockId is null")
 var runningFailureCount = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/9bb3a0c6/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala 
b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index 39f050f..4aa8d45 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -19,17 +19,16 @@ package org.apache.spark.util.io
 
 import java.io.{File, FileInputStream, InputStream}
 import java.nio.ByteBuffer
-import java.nio.channels.{FileChannel, WritableByteChannel}
-import java.nio.file.StandardOpenOption
-
-import scala.collection.mutable.ListBuffer
+import 

svn commit: r29675 - in /dev/spark/2.4.1-SNAPSHOT-2018_09_25_14_02-544f86a-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-09-25 Thread pwendell
Author: pwendell
Date: Tue Sep 25 21:17:02 2018
New Revision: 29675

Log:
Apache Spark 2.4.1-SNAPSHOT-2018_09_25_14_02-544f86a docs


[This commit notification would consist of 1472 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r29672 - in /dev/spark/2.5.0-SNAPSHOT-2018_09_25_12_02-66d2987-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-09-25 Thread pwendell
Author: pwendell
Date: Tue Sep 25 19:17:33 2018
New Revision: 29672

Log:
Apache Spark 2.5.0-SNAPSHOT-2018_09_25_12_02-66d2987 docs


[This commit notification would consist of 1485 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25495][SS] FetchedData.reset should reset all fields

2018-09-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.4 a709718da -> 544f86a69


[SPARK-25495][SS] FetchedData.reset should reset all fields

## What changes were proposed in this pull request?

`FetchedData.reset` should reset `_nextOffsetInFetchedData` and 
`_offsetAfterPoll`. Otherwise it will cause inconsistent cached data and may 
make Kafka connector return wrong results.

## How was this patch tested?

The new unit test.

Closes #22507 from zsxwing/fix-kafka-reset.

Lead-authored-by: Shixiong Zhu 
Co-authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit 66d29870c09e6050dd846336e596faaa8b0d14ad)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/544f86a6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/544f86a6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/544f86a6

Branch: refs/heads/branch-2.4
Commit: 544f86a69bba94dfcb241e41c799ed63ef4210fc
Parents: a709718
Author: Shixiong Zhu 
Authored: Tue Sep 25 11:42:27 2018 -0700
Committer: Shixiong Zhu 
Committed: Tue Sep 25 11:42:39 2018 -0700

--
 .../spark/sql/kafka010/KafkaDataConsumer.scala  |  5 +-
 .../kafka010/KafkaMicroBatchSourceSuite.scala   | 52 
 2 files changed, 56 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/544f86a6/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
index ceb9e31..7b1314b 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
@@ -134,6 +134,8 @@ private[kafka010] case class InternalKafkaConsumer(
 /** Reset the internal pre-fetched data. */
 def reset(): Unit = {
   _records = ju.Collections.emptyListIterator()
+  _nextOffsetInFetchedData = UNKNOWN_OFFSET
+  _offsetAfterPoll = UNKNOWN_OFFSET
 }
 
 /**
@@ -361,8 +363,9 @@ private[kafka010] case class InternalKafkaConsumer(
   if (offset < fetchedData.offsetAfterPoll) {
 // Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. 
Return a record to ask
 // the next call to start from `fetchedData.offsetAfterPoll`.
+val nextOffsetToFetch = fetchedData.offsetAfterPoll
 fetchedData.reset()
-return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
+return fetchedRecord.withRecord(null, nextOffsetToFetch)
   } else {
 // Fetch records from Kafka and update `fetchedData`.
 fetchData(offset, pollTimeoutMs)

http://git-wip-us.apache.org/repos/asf/spark/blob/544f86a6/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 65615fd..e0b6d8c 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -853,6 +853,58 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   )
 }
   }
+
+  test("SPARK-25495: FetchedData.reset should reset all fields") {
+val topic = newTopic()
+val topicPartition = new TopicPartition(topic, 0)
+testUtils.createTopic(topic, partitions = 1)
+
+val ds = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("kafka.isolation.level", "read_committed")
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  .load()
+  .select($"value".as[String])
+
+testUtils.withTranscationalProducer { producer =>
+  producer.beginTransaction()
+  (0 to 3).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+  producer.commitTransaction()
+}
+testUtils.waitUntilOffsetAppears(topicPartition, 5)
+
+val q = ds.writeStream.foreachBatch { (ds, epochId) =>
+  if (epochId == 0) {
+// Send more message before the tasks of the current batch start 
reading the current 

spark git commit: [SPARK-25495][SS] FetchedData.reset should reset all fields

2018-09-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 04db03537 -> 66d29870c


[SPARK-25495][SS] FetchedData.reset should reset all fields

## What changes were proposed in this pull request?

`FetchedData.reset` should reset `_nextOffsetInFetchedData` and 
`_offsetAfterPoll`. Otherwise it will cause inconsistent cached data and may 
make Kafka connector return wrong results.

## How was this patch tested?

The new unit test.

Closes #22507 from zsxwing/fix-kafka-reset.

Lead-authored-by: Shixiong Zhu 
Co-authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/66d29870
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/66d29870
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/66d29870

Branch: refs/heads/master
Commit: 66d29870c09e6050dd846336e596faaa8b0d14ad
Parents: 04db035
Author: Shixiong Zhu 
Authored: Tue Sep 25 11:42:27 2018 -0700
Committer: Shixiong Zhu 
Committed: Tue Sep 25 11:42:27 2018 -0700

--
 .../spark/sql/kafka010/KafkaDataConsumer.scala  |  5 +-
 .../kafka010/KafkaMicroBatchSourceSuite.scala   | 52 
 2 files changed, 56 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/66d29870/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
index ceb9e31..7b1314b 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
@@ -134,6 +134,8 @@ private[kafka010] case class InternalKafkaConsumer(
 /** Reset the internal pre-fetched data. */
 def reset(): Unit = {
   _records = ju.Collections.emptyListIterator()
+  _nextOffsetInFetchedData = UNKNOWN_OFFSET
+  _offsetAfterPoll = UNKNOWN_OFFSET
 }
 
 /**
@@ -361,8 +363,9 @@ private[kafka010] case class InternalKafkaConsumer(
   if (offset < fetchedData.offsetAfterPoll) {
 // Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. 
Return a record to ask
 // the next call to start from `fetchedData.offsetAfterPoll`.
+val nextOffsetToFetch = fetchedData.offsetAfterPoll
 fetchedData.reset()
-return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
+return fetchedRecord.withRecord(null, nextOffsetToFetch)
   } else {
 // Fetch records from Kafka and update `fetchedData`.
 fetchData(offset, pollTimeoutMs)

http://git-wip-us.apache.org/repos/asf/spark/blob/66d29870/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index e5f0088..39c2cde 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -874,6 +874,58 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   )
 }
   }
+
+  test("SPARK-25495: FetchedData.reset should reset all fields") {
+val topic = newTopic()
+val topicPartition = new TopicPartition(topic, 0)
+testUtils.createTopic(topic, partitions = 1)
+
+val ds = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("kafka.isolation.level", "read_committed")
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  .load()
+  .select($"value".as[String])
+
+testUtils.withTranscationalProducer { producer =>
+  producer.beginTransaction()
+  (0 to 3).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+  producer.commitTransaction()
+}
+testUtils.waitUntilOffsetAppears(topicPartition, 5)
+
+val q = ds.writeStream.foreachBatch { (ds, epochId) =>
+  if (epochId == 0) {
+// Send more message before the tasks of the current batch start 
reading the current batch
+// data, so that the executors will prefetch messages in the next 
batch and drop them. In
+  

spark git commit: [SPARK-25486][TEST] Refactor SortBenchmark to use main method

2018-09-25 Thread dongjoon
Repository: spark
Updated Branches:
  refs/heads/master 9cbd001e2 -> 04db03537


[SPARK-25486][TEST] Refactor SortBenchmark to use main method

## What changes were proposed in this pull request?

Refactor SortBenchmark to use main method.
Generate benchmark result:
```
SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain 
org.apache.spark.sql.execution.benchmark.SortBenchmark"
```

## How was this patch tested?

manual tests

Closes #22495 from yucai/SPARK-25486.

Authored-by: yucai 
Signed-off-by: Dongjoon Hyun 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04db0353
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04db0353
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04db0353

Branch: refs/heads/master
Commit: 04db035378012907c93f6e5b4faa6ec11f1fc67b
Parents: 9cbd001
Author: yucai 
Authored: Tue Sep 25 11:13:05 2018 -0700
Committer: Dongjoon Hyun 
Committed: Tue Sep 25 11:13:05 2018 -0700

--
 sql/core/benchmarks/SortBenchmark-results.txt   | 17 +
 .../sql/execution/benchmark/SortBenchmark.scala | 38 +---
 2 files changed, 33 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/04db0353/sql/core/benchmarks/SortBenchmark-results.txt
--
diff --git a/sql/core/benchmarks/SortBenchmark-results.txt 
b/sql/core/benchmarks/SortBenchmark-results.txt
new file mode 100644
index 000..0d00a0c
--- /dev/null
+++ b/sql/core/benchmarks/SortBenchmark-results.txt
@@ -0,0 +1,17 @@
+
+radix sort
+
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_162-b12 on Mac OS X 10.13.6
+Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
+
+radix sort 2500: Best/Avg Time(ms)Rate(M/s)   Per 
Row(ns)   Relative
+
+reference TimSort key prefix array  11770 / 11960  2.1 
470.8   1.0X
+reference Arrays.sort 2106 / 2128 11.9 
 84.3   5.6X
+radix sort one byte 93 /  100269.7 
  3.7 126.9X
+radix sort two bytes   171 /  179146.0 
  6.9  68.7X
+radix sort eight bytes 659 /  664 37.9 
 26.4  17.9X
+radix sort key prefix array   1024 / 1053 24.4 
 41.0  11.5X
+
+

http://git-wip-us.apache.org/repos/asf/spark/blob/04db0353/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala
index 17619ec..958a064 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.benchmark
 
 import java.util.{Arrays, Comparator}
 
-import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
 import org.apache.spark.unsafe.array.LongArray
 import org.apache.spark.unsafe.memory.MemoryBlock
 import org.apache.spark.util.collection.Sorter
@@ -28,12 +28,15 @@ import org.apache.spark.util.random.XORShiftRandom
 
 /**
  * Benchmark to measure performance for aggregate primitives.
- * To run this:
- *  build/sbt "sql/test-only *benchmark.SortBenchmark"
- *
- * Benchmarks in this file are skipped in normal builds.
+ * {{{
+ *   To run this benchmark:
+ *   1. without sbt: bin/spark-submit --class  
+ *   2. build/sbt "sql/test:runMain "
+ *   3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt 
"sql/test:runMain "
+ *  Results will be written to "benchmarks/-results.txt".
+ * }}}
  */
-class SortBenchmark extends BenchmarkWithCodegen {
+object SortBenchmark extends BenchmarkBase {
 
   private def referenceKeyPrefixSort(buf: LongArray, lo: Int, hi: Int, refCmp: 
PrefixComparator) {
 val sortBuffer = new LongArray(MemoryBlock.fromLongArray(new 
Array[Long](buf.size().toInt)))
@@ -54,10 +57,10 @@ class SortBenchmark extends BenchmarkWithCodegen {
   new LongArray(MemoryBlock.fromLongArray(extended)))
   }
 
-  ignore("sort") {
+  def sortBenchmark(): Unit = {
 val size = 2500
 val rand = new XORShiftRandom(123)
-val benchmark = new 

[3/3] spark git commit: [PYSPARK][SQL] Updates to RowQueue

2018-09-25 Thread irashid
[PYSPARK][SQL] Updates to RowQueue

Tested with updates to RowQueueSuite

(cherry picked from commit 6d742d1bd71aa3803dce91a830b37284cb18cf70)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f10aff4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f10aff4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f10aff4

Branch: refs/heads/branch-2.2
Commit: 4f10aff403ccc8287a816cb94ddf7f11e185907a
Parents: dd0e7cf
Author: Imran Rashid 
Authored: Thu Sep 6 12:11:47 2018 -0500
Committer: Imran Rashid 
Committed: Tue Sep 25 11:46:06 2018 -0500

--
 .../spark/sql/execution/python/RowQueue.scala   | 27 ++-
 .../sql/execution/python/RowQueueSuite.scala| 28 +++-
 2 files changed, 41 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4f10aff4/sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala
index cd1e77f..4d6820c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala
@@ -21,9 +21,10 @@ import java.io._
 
 import com.google.common.io.Closeables
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkEnv, SparkException}
 import org.apache.spark.io.NioBufferedFileInputStream
 import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
+import org.apache.spark.serializer.SerializerManager
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.unsafe.memory.MemoryBlock
@@ -108,9 +109,13 @@ private[python] abstract class InMemoryRowQueue(val page: 
MemoryBlock, numFields
  * A RowQueue that is backed by a file on disk. This queue will stop accepting 
new rows once any
  * reader has begun reading from the queue.
  */
-private[python] case class DiskRowQueue(file: File, fields: Int) extends 
RowQueue {
-  private var out = new DataOutputStream(
-new BufferedOutputStream(new FileOutputStream(file.toString)))
+private[python] case class DiskRowQueue(
+file: File,
+fields: Int,
+serMgr: SerializerManager) extends RowQueue {
+
+  private var out = new DataOutputStream(serMgr.wrapForEncryption(
+new BufferedOutputStream(new FileOutputStream(file.toString
   private var unreadBytes = 0L
 
   private var in: DataInputStream = _
@@ -131,7 +136,8 @@ private[python] case class DiskRowQueue(file: File, fields: 
Int) extends RowQueu
 if (out != null) {
   out.close()
   out = null
-  in = new DataInputStream(new NioBufferedFileInputStream(file))
+  in = new DataInputStream(serMgr.wrapForEncryption(
+new NioBufferedFileInputStream(file)))
 }
 
 if (unreadBytes > 0) {
@@ -166,7 +172,8 @@ private[python] case class DiskRowQueue(file: File, fields: 
Int) extends RowQueu
 private[python] case class HybridRowQueue(
 memManager: TaskMemoryManager,
 tempDir: File,
-numFields: Int)
+numFields: Int,
+serMgr: SerializerManager)
   extends MemoryConsumer(memManager) with RowQueue {
 
   // Each buffer should have at least one row
@@ -212,7 +219,7 @@ private[python] case class HybridRowQueue(
   }
 
   private def createDiskQueue(): RowQueue = {
-DiskRowQueue(File.createTempFile("buffer", "", tempDir), numFields)
+DiskRowQueue(File.createTempFile("buffer", "", tempDir), numFields, serMgr)
   }
 
   private def createNewQueue(required: Long): RowQueue = {
@@ -279,3 +286,9 @@ private[python] case class HybridRowQueue(
 }
   }
 }
+
+private[python] object HybridRowQueue {
+  def apply(taskMemoryMgr: TaskMemoryManager, file: File, fields: Int): 
HybridRowQueue = {
+HybridRowQueue(taskMemoryMgr, file, fields, SparkEnv.get.serializerManager)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4f10aff4/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
index ffda33c..1ec9986 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
@@ -20,12 +20,15 @@ package org.apache.spark.sql.execution.python
 import java.io.File
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
-import 

[1/3] spark git commit: [SPARK-25253][PYSPARK] Refactor local connection & auth code

2018-09-25 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 bd12eb75d -> 4f10aff40


[SPARK-25253][PYSPARK] Refactor local connection & auth code

This eliminates some duplication in the code to connect to a server on 
localhost to talk directly to the jvm.  Also it gives consistent ipv6 and error 
handling.  Two other incidental changes, that shouldn't matter:
1) python barrier tasks perform authentication immediately (rather than waiting 
for the BARRIER_FUNCTION indicator)
2) for `rdd._load_from_socket`, the timeout is only increased after 
authentication.

Closes #22247 from squito/py_connection_refactor.

Authored-by: Imran Rashid 
Signed-off-by: hyukjinkwon 
(cherry picked from commit 38391c9aa8a88fcebb337934f30298a32d91596b)
(cherry picked from commit a2a54a5f49364a1825932c9f04eb0ff82dd7d465)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc1c4e7d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc1c4e7d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc1c4e7d

Branch: refs/heads/branch-2.2
Commit: fc1c4e7d24f7d0afb3b79d66aa9812e7dddc2f38
Parents: bd12eb7
Author: Imran Rashid 
Authored: Wed Aug 29 09:47:38 2018 +0800
Committer: Imran Rashid 
Committed: Tue Sep 25 11:45:59 2018 -0500

--
 python/pyspark/java_gateway.py | 32 +++-
 python/pyspark/rdd.py  | 24 ++--
 python/pyspark/worker.py   |  7 ++-
 3 files changed, 35 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fc1c4e7d/python/pyspark/java_gateway.py
--
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 7abf2c1..191dfce 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -133,7 +133,7 @@ def launch_gateway(conf=None):
 return gateway
 
 
-def do_server_auth(conn, auth_secret):
+def _do_server_auth(conn, auth_secret):
 """
 Performs the authentication protocol defined by the SocketAuthHelper class 
on the given
 file-like object 'conn'.
@@ -144,3 +144,33 @@ def do_server_auth(conn, auth_secret):
 if reply != "ok":
 conn.close()
 raise Exception("Unexpected reply from iterator server.")
+
+
+def local_connect_and_auth(port, auth_secret):
+"""
+Connect to local host, authenticate with it, and return a (sockfile,sock) 
for that connection.
+Handles IPV4 & IPV6, does some error handling.
+:param port
+:param auth_secret
+:return: a tuple with (sockfile, sock)
+"""
+sock = None
+errors = []
+# Support for both IPv4 and IPv6.
+# On most of IPv6-ready systems, IPv6 will take precedence.
+for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, 
socket.SOCK_STREAM):
+af, socktype, proto, _, sa = res
+try:
+sock = socket.socket(af, socktype, proto)
+sock.settimeout(15)
+sock.connect(sa)
+sockfile = sock.makefile("rwb", 65536)
+_do_server_auth(sockfile, auth_secret)
+return (sockfile, sock)
+except socket.error as e:
+emsg = _exception_message(e)
+errors.append("tried to connect to %s, but an error occured: %s" % 
(sa, emsg))
+sock.close()
+sock = None
+else:
+raise Exception("could not open socket: %s" % errors)

http://git-wip-us.apache.org/repos/asf/spark/blob/fc1c4e7d/python/pyspark/rdd.py
--
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 864cebb..7d84cbd 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -39,7 +39,7 @@ if sys.version > '3':
 else:
 from itertools import imap as map, ifilter as filter
 
-from pyspark.java_gateway import do_server_auth
+from pyspark.java_gateway import local_connect_and_auth
 from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
 BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
 PickleSerializer, pack_long, AutoBatchedSerializer, write_with_length, \
@@ -122,30 +122,10 @@ def _parse_memory(s):
 
 
 def _load_from_socket(sock_info, serializer):
-port, auth_secret = sock_info
-sock = None
-# Support for both IPv4 and IPv6.
-# On most of IPv6-ready systems, IPv6 will take precedence.
-for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, 
socket.SOCK_STREAM):
-af, socktype, proto, canonname, sa = res
-sock = socket.socket(af, socktype, proto)
-try:
-sock.settimeout(15)
-sock.connect(sa)
-except socket.error:
-sock.close()
-sock = None
-continue
-break
-  

[2/3] spark git commit: [PYSPARK] Updates to pyspark broadcast

2018-09-25 Thread irashid
[PYSPARK] Updates to pyspark broadcast

(cherry picked from commit 09dd34cb1706f2477a89174d6a1a0f17ed5b0a65)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd0e7cf5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd0e7cf5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd0e7cf5

Branch: refs/heads/branch-2.2
Commit: dd0e7cf5287148618404593ca095dd900b6e993f
Parents: fc1c4e7
Author: Imran Rashid 
Authored: Mon Aug 13 21:35:34 2018 -0500
Committer: Imran Rashid 
Committed: Tue Sep 25 11:46:03 2018 -0500

--
 .../org/apache/spark/api/python/PythonRDD.scala | 349 ---
 .../spark/api/python/PythonRDDSuite.scala   |  23 +-
 dev/sparktestsupport/modules.py |   2 +
 python/pyspark/broadcast.py |  58 ++-
 python/pyspark/context.py   |  63 +++-
 python/pyspark/serializers.py   |  58 +++
 python/pyspark/test_broadcast.py| 126 +++
 python/pyspark/test_serializers.py  |  90 +
 python/pyspark/worker.py|  24 +-
 9 files changed, 705 insertions(+), 88 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/dd0e7cf5/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 7b5a179..2f4e3bc 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -24,8 +24,10 @@ import java.util.{ArrayList => JArrayList, List => JList, 
Map => JMap}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
 import scala.language.existentials
-import scala.util.control.NonFatal
+import scala.util.Try
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.compress.CompressionCodec
@@ -37,6 +39,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, 
JavaSparkContext}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.input.PortableDataStream
 import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.security.SocketAuthHelper
 import org.apache.spark.util._
@@ -293,19 +296,51 @@ private[spark] class PythonRunner(
 val newBids = broadcastVars.map(_.id).toSet
 // number of different broadcasts
 val toRemove = oldBids.diff(newBids)
-val cnt = toRemove.size + newBids.diff(oldBids).size
+val addedBids = newBids.diff(oldBids)
+val cnt = toRemove.size + addedBids.size
+val needsDecryptionServer = env.serializerManager.encryptionEnabled && 
addedBids.nonEmpty
+dataOut.writeBoolean(needsDecryptionServer)
 dataOut.writeInt(cnt)
-for (bid <- toRemove) {
-  // remove the broadcast from worker
-  dataOut.writeLong(- bid - 1)  // bid >= 0
-  oldBids.remove(bid)
+def sendBidsToRemove(): Unit = {
+  for (bid <- toRemove) {
+// remove the broadcast from worker
+dataOut.writeLong(-bid - 1) // bid >= 0
+oldBids.remove(bid)
+  }
 }
-for (broadcast <- broadcastVars) {
-  if (!oldBids.contains(broadcast.id)) {
+if (needsDecryptionServer) {
+  // if there is encryption, we setup a server which reads the 
encrypted files, and sends
+// the decrypted data to python
+  val idsAndFiles = broadcastVars.flatMap { broadcast =>
+  if (oldBids.contains(broadcast.id)) {
+  None
+} else {
+  Some((broadcast.id, broadcast.value.path))
+}
+}
+  val server = new EncryptedPythonBroadcastServer(env, idsAndFiles)
+  dataOut.writeInt(server.port)
+  logTrace(s"broadcast decryption server setup on ${server.port}")
+  PythonRDD.writeUTF(server.secret, dataOut)
+  sendBidsToRemove()
+  idsAndFiles.foreach { case (id, _) =>
 // send new broadcast
-dataOut.writeLong(broadcast.id)
-PythonRDD.writeUTF(broadcast.value.path, dataOut)
-oldBids.add(broadcast.id)
+dataOut.writeLong(id)
+oldBids.add(id)
+  }
+  dataOut.flush()
+  logTrace("waiting for python to read decrypted broadcast data from 
server")
+  server.waitTillBroadcastDataSent()
+  logTrace("done sending decrypted data to python")
+} else {
+ 

svn commit: r29666 - in /dev/spark/2.5.0-SNAPSHOT-2018_09_25_08_02-9cbd001-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-09-25 Thread pwendell
Author: pwendell
Date: Tue Sep 25 15:17:19 2018
New Revision: 29666

Log:
Apache Spark 2.5.0-SNAPSHOT-2018_09_25_08_02-9cbd001 docs


[This commit notification would consist of 1485 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r29662 - in /dev/spark/2.4.1-SNAPSHOT-2018_09_25_06_02-a709718-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-09-25 Thread pwendell
Author: pwendell
Date: Tue Sep 25 13:17:02 2018
New Revision: 29662

Log:
Apache Spark 2.4.1-SNAPSHOT-2018_09_25_06_02-a709718 docs


[This commit notification would consist of 1472 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23907][SQL] Revert regr_* functions entirely

2018-09-25 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/branch-2.4 4ca4ef7b9 -> a709718da


[SPARK-23907][SQL] Revert regr_* functions entirely

## What changes were proposed in this pull request?
This patch reverts entirely all the regr_* functions added in SPARK-23907. 
These were added by mgaido91 (and proposed by gatorsmile) to improve 
compatibility with other database systems, without any actual use cases. 
However, they are very rarely used, and in Spark there are much better ways to 
compute these functions, due to Spark's flexibility in exposing real 
programming APIs.

I'm going through all the APIs added in Spark 2.4 and I think we should revert 
these. If there are strong enough demands and more use cases, we can add them 
back in the future pretty easily.

## How was this patch tested?
Reverted test cases also.

Closes #22541 from rxin/SPARK-23907.

Authored-by: Reynold Xin 
Signed-off-by: hyukjinkwon 
(cherry picked from commit 9cbd001e2476cd06aa0bcfcc77a21a9077d5797a)
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a709718d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a709718d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a709718d

Branch: refs/heads/branch-2.4
Commit: a709718dae495725af4e961b1e0f85bce5d34368
Parents: 4ca4ef7
Author: Reynold Xin 
Authored: Tue Sep 25 20:13:07 2018 +0800
Committer: hyukjinkwon 
Committed: Tue Sep 25 20:13:22 2018 +0800

--
 .../catalyst/analysis/FunctionRegistry.scala|   9 -
 .../expressions/aggregate/regression.scala  | 190 ---
 .../sql-tests/inputs/udaf-regrfunctions.sql |  56 --
 .../results/udaf-regrfunctions.sql.out  |  93 -
 4 files changed, 348 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a709718d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 77860e1..695267a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -299,15 +299,6 @@ object FunctionRegistry {
 expression[CollectList]("collect_list"),
 expression[CollectSet]("collect_set"),
 expression[CountMinSketchAgg]("count_min_sketch"),
-expression[RegrCount]("regr_count"),
-expression[RegrSXX]("regr_sxx"),
-expression[RegrSYY]("regr_syy"),
-expression[RegrAvgX]("regr_avgx"),
-expression[RegrAvgY]("regr_avgy"),
-expression[RegrSXY]("regr_sxy"),
-expression[RegrSlope]("regr_slope"),
-expression[RegrR2]("regr_r2"),
-expression[RegrIntercept]("regr_intercept"),
 
 // string functions
 expression[Ascii]("ascii"),

http://git-wip-us.apache.org/repos/asf/spark/blob/a709718d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/regression.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/regression.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/regression.scala
deleted file mode 100644
index d8f4505..000
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/regression.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.expressions.aggregate
-
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{AbstractDataType, DoubleType}
-
-/**
- * Base trait for all regression functions.
- */
-trait RegrLike extends AggregateFunction with 

spark git commit: [SPARK-23907][SQL] Revert regr_* functions entirely

2018-09-25 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 7d8f5b62c -> 9cbd001e2


[SPARK-23907][SQL] Revert regr_* functions entirely

## What changes were proposed in this pull request?
This patch reverts entirely all the regr_* functions added in SPARK-23907. 
These were added by mgaido91 (and proposed by gatorsmile) to improve 
compatibility with other database systems, without any actual use cases. 
However, they are very rarely used, and in Spark there are much better ways to 
compute these functions, due to Spark's flexibility in exposing real 
programming APIs.

I'm going through all the APIs added in Spark 2.4 and I think we should revert 
these. If there are strong enough demands and more use cases, we can add them 
back in the future pretty easily.

## How was this patch tested?
Reverted test cases also.

Closes #22541 from rxin/SPARK-23907.

Authored-by: Reynold Xin 
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9cbd001e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9cbd001e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9cbd001e

Branch: refs/heads/master
Commit: 9cbd001e2476cd06aa0bcfcc77a21a9077d5797a
Parents: 7d8f5b6
Author: Reynold Xin 
Authored: Tue Sep 25 20:13:07 2018 +0800
Committer: hyukjinkwon 
Committed: Tue Sep 25 20:13:07 2018 +0800

--
 .../catalyst/analysis/FunctionRegistry.scala|   9 -
 .../expressions/aggregate/regression.scala  | 190 ---
 .../sql-tests/inputs/udaf-regrfunctions.sql |  56 --
 .../results/udaf-regrfunctions.sql.out  |  93 -
 4 files changed, 348 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9cbd001e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 8b69a47..7dafebf 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -300,15 +300,6 @@ object FunctionRegistry {
 expression[CollectList]("collect_list"),
 expression[CollectSet]("collect_set"),
 expression[CountMinSketchAgg]("count_min_sketch"),
-expression[RegrCount]("regr_count"),
-expression[RegrSXX]("regr_sxx"),
-expression[RegrSYY]("regr_syy"),
-expression[RegrAvgX]("regr_avgx"),
-expression[RegrAvgY]("regr_avgy"),
-expression[RegrSXY]("regr_sxy"),
-expression[RegrSlope]("regr_slope"),
-expression[RegrR2]("regr_r2"),
-expression[RegrIntercept]("regr_intercept"),
 
 // string functions
 expression[Ascii]("ascii"),

http://git-wip-us.apache.org/repos/asf/spark/blob/9cbd001e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/regression.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/regression.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/regression.scala
deleted file mode 100644
index d8f4505..000
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/regression.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.expressions.aggregate
-
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{AbstractDataType, DoubleType}
-
-/**
- * Base trait for all regression functions.
- */
-trait RegrLike extends AggregateFunction with ImplicitCastInputTypes {
-  def y: Expression
-  def x: Expression
-
-  override def children: Seq[Expression] = 

svn commit: r29652 - in /dev/spark/2.5.0-SNAPSHOT-2018_09_25_00_02-7d8f5b6-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-09-25 Thread pwendell
Author: pwendell
Date: Tue Sep 25 07:17:33 2018
New Revision: 29652

Log:
Apache Spark 2.5.0-SNAPSHOT-2018_09_25_00_02-7d8f5b6 docs


[This commit notification would consist of 1485 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] Git Push Summary

2018-09-25 Thread jshao
Repository: spark
Updated Tags:  refs/tags/v2.3.2 [created] 02b510728

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org