spark git commit: [MINOR] Fix Scala 2.12 build

2018-08-24 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 8e6427871 -> 3e4f1666a


[MINOR] Fix Scala 2.12 build

## What changes were proposed in this pull request?

[SPARK-25095](https://github.com/apache/spark/commit/ad45299d047c10472fd3a86103930fe7c54a4cf1)
 introduced `ambiguous reference to overloaded definition`

```
[error] 
/Users/d_tsai/dev/apache-spark/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala:242:
 ambiguous reference to overloaded definition,
[error] both method addTaskCompletionListener in class TaskContext of type 
[U](f: org.apache.spark.TaskContext => U)org.apache.spark.TaskContext
[error] and  method addTaskCompletionListener in class TaskContext of type 
(listener: 
org.apache.spark.util.TaskCompletionListener)org.apache.spark.TaskContext
[error] match argument types (org.apache.spark.TaskContext => Unit)
[error]   context.addTaskCompletionListener(_ => server.close())
[error]   ^
[error] one error found
[error] Compile failed at Aug 24, 2018 1:56:06 PM [31.582s]
```
which fails the Scala 2.12 branch build.

## How was this patch tested?

Existing tests

Closes #9 from dbtsai/fix-2.12-build.

Authored-by: DB Tsai 
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e4f1666
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e4f1666
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e4f1666

Branch: refs/heads/master
Commit: 3e4f1666a1253f9d5df05c19b1ce77fe18e9fde3
Parents: 8e64278
Author: DB Tsai 
Authored: Sat Aug 25 13:48:46 2018 +0800
Committer: hyukjinkwon 
Committed: Sat Aug 25 13:48:46 2018 +0800

--
 core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3e4f1666/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index f824191..151c910 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -239,7 +239,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
 }
 // Close ServerSocket on task completion.
 serverSocket.foreach { server =>
-  context.addTaskCompletionListener(_ => server.close())
+  context.addTaskCompletionListener[Unit](_ => server.close())
 }
 val boundPort: Int = serverSocket.map(_.getLocalPort).getOrElse(0)
 if (boundPort == -1) {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Correct missing punctuation in the documentation

2018-08-24 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 9714fa547 -> 8e6427871


Correct missing punctuation in the documentation

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, 
manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Closes #22189 from movrsprbp/patch-1.

Authored-by: jaroslav chládek 
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e642787
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e642787
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e642787

Branch: refs/heads/master
Commit: 8e6427871a40b82f4a7a28aaa6e197e4e01dc878
Parents: 9714fa5
Author: jaroslav chládek 
Authored: Sat Aug 25 12:49:48 2018 +0800
Committer: hyukjinkwon 
Committed: Sat Aug 25 12:49:48 2018 +0800

--
 docs/structured-streaming-programming-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8e642787/docs/structured-streaming-programming-guide.md
--
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 355a6cc..73de189 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -1005,7 +1005,7 @@ Here is an illustration.
 
 As shown in the illustration, the maximum event time tracked by the engine is 
the 
 *blue dashed line*, and the watermark set as `(max event time - '10 mins')`
-at the beginning of every trigger is the red line  For example, when the 
engine observes the data 
+at the beginning of every trigger is the red line. For example, when the 
engine observes the data 
 `(12:14, dog)`, it sets the watermark for the next trigger as `12:04`.
 This watermark lets the engine maintain intermediate state for additional 10 
minutes to allow late
 data to be counted. For example, the data `(12:09, cat)` is out of order and 
late, and it falls in


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r28952 - in /dev/spark/2.3.3-SNAPSHOT-2018_08_24_18_01-f598382-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-24 Thread pwendell
Author: pwendell
Date: Sat Aug 25 01:15:33 2018
New Revision: 28952

Log:
Apache Spark 2.3.3-SNAPSHOT-2018_08_24_18_01-f598382 docs


[This commit notification would consist of 1443 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r28948 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_24_16_01-9714fa5-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-24 Thread pwendell
Author: pwendell
Date: Fri Aug 24 23:16:03 2018
New Revision: 28948

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_24_16_01-9714fa5 docs


[This commit notification would consist of 1478 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25124][ML] VectorSizeHint setSize and getSize don't return values backport to 2.3

2018-08-24 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 42c1fdd22 -> f5983823e


[SPARK-25124][ML] VectorSizeHint setSize and getSize don't return values 
backport to 2.3

## What changes were proposed in this pull request?
In feature.py, VectorSizeHint setSize and getSize don't return value. Add 
return.

(Please fill in changes proposed in this fix)

## How was this patch tested?

Unit Test added

Closes #8 from huaxingao/spark-25124-2.3.

Authored-by: Huaxin Gao 
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5983823
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5983823
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5983823

Branch: refs/heads/branch-2.3
Commit: f5983823e9b4a3b4762481306ea071a73f5742fc
Parents: 42c1fdd
Author: Huaxin Gao 
Authored: Fri Aug 24 15:41:18 2018 -0700
Committer: Joseph K. Bradley 
Committed: Fri Aug 24 15:41:18 2018 -0700

--
 python/pyspark/ml/feature.py |  4 ++--
 python/pyspark/ml/tests.py   | 17 +
 2 files changed, 19 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f5983823/python/pyspark/ml/feature.py
--
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 04b07e6..a444fe0 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -3673,12 +3673,12 @@ class VectorSizeHint(JavaTransformer, HasInputCol, 
HasHandleInvalid, JavaMLReada
 @since("2.3.0")
 def getSize(self):
 """ Gets size param, the size of vectors in `inputCol`."""
-self.getOrDefault(self.size)
+return self.getOrDefault(self.size)
 
 @since("2.3.0")
 def setSize(self, value):
 """ Sets size param, the size of vectors in `inputCol`."""
-self._set(size=value)
+return self._set(size=value)
 
 
 if __name__ == "__main__":

http://git-wip-us.apache.org/repos/asf/spark/blob/f5983823/python/pyspark/ml/tests.py
--
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index 1af2b91..49912d2 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -678,6 +678,23 @@ class FeatureTests(SparkSessionTestCase):
 expected2 = [Row(id=0, indexed=0.0), Row(id=1, indexed=1.0)]
 self.assertEqual(actual2, expected2)
 
+def test_vector_size_hint(self):
+df = self.spark.createDataFrame(
+[(0, Vectors.dense([0.0, 10.0, 0.5])),
+ (1, Vectors.dense([1.0, 11.0, 0.5, 0.6])),
+ (2, Vectors.dense([2.0, 12.0]))],
+["id", "vector"])
+
+sizeHint = VectorSizeHint(
+inputCol="vector",
+handleInvalid="skip")
+sizeHint.setSize(3)
+self.assertEqual(sizeHint.getSize(), 3)
+
+output = sizeHint.transform(df).head().vector
+expected = DenseVector([0.0, 10.0, 0.5])
+self.assertEqual(output, expected)
+
 
 class HasInducedError(Params):
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25234][SPARKR] avoid integer overflow in parallelize

2018-08-24 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 fcc9bd632 -> 42c1fdd22


[SPARK-25234][SPARKR] avoid integer overflow in parallelize

## What changes were proposed in this pull request?

`parallelize` uses integer multiplication to determine the split indices. It 
might cause integer overflow.

## How was this patch tested?

unit test

Closes #5 from mengxr/SPARK-25234.

Authored-by: Xiangrui Meng 
Signed-off-by: Xiangrui Meng 
(cherry picked from commit 9714fa547325ed7b6a8066a88957537936b233dd)
Signed-off-by: Xiangrui Meng 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42c1fdd2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42c1fdd2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42c1fdd2

Branch: refs/heads/branch-2.3
Commit: 42c1fdd229b3cf19ff804b7516eae9d36ae50c81
Parents: fcc9bd6
Author: Xiangrui Meng 
Authored: Fri Aug 24 15:03:00 2018 -0700
Committer: Xiangrui Meng 
Committed: Fri Aug 24 15:04:11 2018 -0700

--
 R/pkg/R/context.R| 9 -
 R/pkg/tests/fulltests/test_context.R | 7 +++
 2 files changed, 11 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/42c1fdd2/R/pkg/R/context.R
--
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 443c2ff..25e2d15 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -138,11 +138,10 @@ parallelize <- function(sc, coll, numSlices = 1) {
 
   sizeLimit <- getMaxAllocationLimit(sc)
   objectSize <- object.size(coll)
+  len <- length(coll)
 
   # For large objects we make sure the size of each slice is also smaller than 
sizeLimit
-  numSerializedSlices <- max(numSlices, ceiling(objectSize / sizeLimit))
-  if (numSerializedSlices > length(coll))
-numSerializedSlices <- length(coll)
+  numSerializedSlices <- min(len, max(numSlices, ceiling(objectSize / 
sizeLimit)))
 
   # Generate the slice ids to put each row
   # For instance, for numSerializedSlices of 22, length of 50
@@ -153,8 +152,8 @@ parallelize <- function(sc, coll, numSlices = 1) {
   splits <- if (numSerializedSlices > 0) {
 unlist(lapply(0: (numSerializedSlices - 1), function(x) {
   # nolint start
-  start <- trunc((x * length(coll)) / numSerializedSlices)
-  end <- trunc(((x + 1) * length(coll)) / numSerializedSlices)
+  start <- trunc((as.numeric(x) * len) / numSerializedSlices)
+  end <- trunc(((as.numeric(x) + 1) * len) / numSerializedSlices)
   # nolint end
   rep(start, end - start)
 }))

http://git-wip-us.apache.org/repos/asf/spark/blob/42c1fdd2/R/pkg/tests/fulltests/test_context.R
--
diff --git a/R/pkg/tests/fulltests/test_context.R 
b/R/pkg/tests/fulltests/test_context.R
index f0d0a51..288a271 100644
--- a/R/pkg/tests/fulltests/test_context.R
+++ b/R/pkg/tests/fulltests/test_context.R
@@ -240,3 +240,10 @@ test_that("add and get file to be downloaded with Spark 
job on every node", {
   unlink(path, recursive = TRUE)
   sparkR.session.stop()
 })
+
+test_that("SPARK-25234: parallelize should not have integer overflow", {
+  sc <- sparkR.sparkContext(master = sparkRTestMaster)
+  # 47000 * 47000 exceeds integer range
+  parallelize(sc, 1:47000, 47000)
+  sparkR.session.stop()
+})


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25234][SPARKR] avoid integer overflow in parallelize

2018-08-24 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master f8346d2fc -> 9714fa547


[SPARK-25234][SPARKR] avoid integer overflow in parallelize

## What changes were proposed in this pull request?

`parallelize` uses integer multiplication to determine the split indices. It 
might cause integer overflow.

## How was this patch tested?

unit test

Closes #5 from mengxr/SPARK-25234.

Authored-by: Xiangrui Meng 
Signed-off-by: Xiangrui Meng 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9714fa54
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9714fa54
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9714fa54

Branch: refs/heads/master
Commit: 9714fa547325ed7b6a8066a88957537936b233dd
Parents: f8346d2
Author: Xiangrui Meng 
Authored: Fri Aug 24 15:03:00 2018 -0700
Committer: Xiangrui Meng 
Committed: Fri Aug 24 15:03:00 2018 -0700

--
 R/pkg/R/context.R| 9 -
 R/pkg/tests/fulltests/test_context.R | 7 +++
 2 files changed, 11 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9714fa54/R/pkg/R/context.R
--
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 7e77ea4..f168ca7 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -138,11 +138,10 @@ parallelize <- function(sc, coll, numSlices = 1) {
 
   sizeLimit <- getMaxAllocationLimit(sc)
   objectSize <- object.size(coll)
+  len <- length(coll)
 
   # For large objects we make sure the size of each slice is also smaller than 
sizeLimit
-  numSerializedSlices <- max(numSlices, ceiling(objectSize / sizeLimit))
-  if (numSerializedSlices > length(coll))
-numSerializedSlices <- length(coll)
+  numSerializedSlices <- min(len, max(numSlices, ceiling(objectSize / 
sizeLimit)))
 
   # Generate the slice ids to put each row
   # For instance, for numSerializedSlices of 22, length of 50
@@ -153,8 +152,8 @@ parallelize <- function(sc, coll, numSlices = 1) {
   splits <- if (numSerializedSlices > 0) {
 unlist(lapply(0: (numSerializedSlices - 1), function(x) {
   # nolint start
-  start <- trunc((x * length(coll)) / numSerializedSlices)
-  end <- trunc(((x + 1) * length(coll)) / numSerializedSlices)
+  start <- trunc((as.numeric(x) * len) / numSerializedSlices)
+  end <- trunc(((as.numeric(x) + 1) * len) / numSerializedSlices)
   # nolint end
   rep(start, end - start)
 }))

http://git-wip-us.apache.org/repos/asf/spark/blob/9714fa54/R/pkg/tests/fulltests/test_context.R
--
diff --git a/R/pkg/tests/fulltests/test_context.R 
b/R/pkg/tests/fulltests/test_context.R
index f0d0a51..288a271 100644
--- a/R/pkg/tests/fulltests/test_context.R
+++ b/R/pkg/tests/fulltests/test_context.R
@@ -240,3 +240,10 @@ test_that("add and get file to be downloaded with Spark 
job on every node", {
   unlink(path, recursive = TRUE)
   sparkR.session.stop()
 })
+
+test_that("SPARK-25234: parallelize should not have integer overflow", {
+  sc <- sparkR.sparkContext(master = sparkRTestMaster)
+  # 47000 * 47000 exceeds integer range
+  parallelize(sc, 1:47000, 47000)
+  sparkR.session.stop()
+})


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25174][YARN] Limit the size of diagnostic message for am to unregister itself from rm

2018-08-24 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 8bb9414aa -> f8346d2fc


[SPARK-25174][YARN] Limit the size of diagnostic message for am to unregister 
itself from rm

## What changes were proposed in this pull request?

When using older versions of spark releases,  a use case generated a huge 
code-gen file which hit the limitation `Constant pool has grown past JVM limit 
of 0x`.  In this situation, it should fail immediately. But the diagnosis 
message sent to RM is too large,  the ApplicationMaster suspended and RM's 
ZKStateStore was crashed. For 2.3 or later spark releases the limitation of 
code-gen has been removed, but maybe there are still some uncaught exceptions 
that contain oversized error message will cause such a problem.

This PR is aim to cut down the diagnosis message size.

## How was this patch tested?

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Closes #22180 from yaooqinn/SPARK-25174.

Authored-by: Kent Yao 
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f8346d2f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f8346d2f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f8346d2f

Branch: refs/heads/master
Commit: f8346d2fc01f1e881e4e3f9c4499bf5f9e3ceb3f
Parents: 8bb9414
Author: Kent Yao 
Authored: Fri Aug 24 13:44:19 2018 -0700
Committer: Marcelo Vanzin 
Committed: Fri Aug 24 13:44:19 2018 -0700

--
 .../scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 5 +++--
 .../src/main/scala/org/apache/spark/deploy/yarn/config.scala   | 6 ++
 2 files changed, 9 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f8346d2f/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
--
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 55ed114..8f94e3f 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.yarn
 
 import java.io.{File, IOException}
 import java.lang.reflect.{InvocationTargetException, Modifier}
-import java.net.{Socket, URI, URL}
+import java.net.{URI, URL}
 import java.security.PrivilegedExceptionAction
 import java.util.concurrent.{TimeoutException, TimeUnit}
 
@@ -28,6 +28,7 @@ import scala.concurrent.Promise
 import scala.concurrent.duration.Duration
 import scala.util.control.NonFatal
 
+import org.apache.commons.lang3.{StringUtils => ComStrUtils}
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.util.StringUtils
 import org.apache.hadoop.yarn.api._
@@ -368,7 +369,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 }
 logInfo(s"Final app status: $finalStatus, exitCode: $exitCode" +
   Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
-finalMsg = msg
+finalMsg = ComStrUtils.abbreviate(msg, 
sparkConf.get(AM_FINAL_MSG_LIMIT).toInt)
 finished = true
 if (!inShutdown && Thread.currentThread() != reporterThread && 
reporterThread != null) {
   logDebug("shutting down reporter thread")

http://git-wip-us.apache.org/repos/asf/spark/blob/f8346d2f/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
--
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 1013fd2..ab8273b 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -192,6 +192,12 @@ package object config {
 .toSequence
 .createWithDefault(Nil)
 
+  private[spark] val AM_FINAL_MSG_LIMIT = 
ConfigBuilder("spark.yarn.am.finalMessageLimit")
+.doc("The limit size of final diagnostic message for our ApplicationMaster 
to unregister from" +
+  " the ResourceManager.")
+.bytesConf(ByteUnit.BYTE)
+.createWithDefaultString("1m")
+
   /* Client-mode AM configuration. */
 
   private[spark] val AM_CORES = ConfigBuilder("spark.yarn.am.cores")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25214][SS] Fix the issue that Kafka v2 source may return duplicated records when `failOnDataLoss=false`

2018-08-24 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master c20916a5d -> 8bb9414aa


[SPARK-25214][SS] Fix the issue that Kafka v2 source may return duplicated 
records when `failOnDataLoss=false`

## What changes were proposed in this pull request?

When there are missing offsets, Kafka v2 source may return duplicated records 
when `failOnDataLoss=false` because it doesn't skip missing offsets.

This PR fixes the issue and also adds regression tests for all Kafka readers.

## How was this patch tested?

New tests.

Closes #22207 from zsxwing/SPARK-25214.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8bb9414a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8bb9414a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8bb9414a

Branch: refs/heads/master
Commit: 8bb9414aaff4a147db2d921dccdbd04c8eb4e5db
Parents: c20916a
Author: Shixiong Zhu 
Authored: Fri Aug 24 12:00:34 2018 -0700
Committer: Shixiong Zhu 
Committed: Fri Aug 24 12:00:34 2018 -0700

--
 .../kafka010/KafkaMicroBatchReadSupport.scala   |   2 +-
 .../spark/sql/kafka010/KafkaSourceRDD.scala |  38 ---
 .../kafka010/KafkaDontFailOnDataLossSuite.scala | 272 +++
 .../kafka010/KafkaMicroBatchSourceSuite.scala   | 139 +-
 4 files changed, 276 insertions(+), 175 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8bb9414a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
index c31af60..70f37e3 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
@@ -341,6 +341,7 @@ private[kafka010] case class KafkaMicroBatchPartitionReader(
   val record = consumer.get(nextOffset, rangeToRead.untilOffset, 
pollTimeoutMs, failOnDataLoss)
   if (record != null) {
 nextRow = converter.toUnsafeRow(record)
+nextOffset = record.offset + 1
 true
   } else {
 false
@@ -352,7 +353,6 @@ private[kafka010] case class KafkaMicroBatchPartitionReader(
 
   override def get(): UnsafeRow = {
 assert(nextRow != null)
-nextOffset += 1
 nextRow
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8bb9414a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
index 8b4494d..f8b9005 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
@@ -77,44 +77,6 @@ private[kafka010] class KafkaSourceRDD(
 offsetRanges.zipWithIndex.map { case (o, i) => new 
KafkaSourceRDDPartition(i, o) }.toArray
   }
 
-  override def count(): Long = offsetRanges.map(_.size).sum
-
-  override def countApprox(timeout: Long, confidence: Double): 
PartialResult[BoundedDouble] = {
-val c = count
-new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
-  }
-
-  override def isEmpty(): Boolean = count == 0L
-
-  override def take(num: Int): Array[ConsumerRecord[Array[Byte], Array[Byte]]] 
= {
-val nonEmptyPartitions =
-  
this.partitions.map(_.asInstanceOf[KafkaSourceRDDPartition]).filter(_.offsetRange.size
 > 0)
-
-if (num < 1 || nonEmptyPartitions.isEmpty) {
-  return new Array[ConsumerRecord[Array[Byte], Array[Byte]]](0)
-}
-
-// Determine in advance how many messages need to be taken from each 
partition
-val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) 
=>
-  val remain = num - result.values.sum
-  if (remain > 0) {
-val taken = Math.min(remain, part.offsetRange.size)
-result + (part.index -> taken.toInt)
-  } else {
-result
-  }
-}
-
-val buf = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]
-val res = context.runJob(
-  this,
-  (tc: TaskContext, it: Iterator[ConsumerRecord[Array[Byte], 
Array[Byte]]]) =>
-  it.take(parts(tc.partitionId)).toArray, parts.keys.toArray
-)
-res.foreach(buf ++= _)
-buf.toArray
-  }
-
   override def getPreferredLocat

svn commit: r28935 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_24_08_02-c20916a-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-24 Thread pwendell
Author: pwendell
Date: Fri Aug 24 15:16:11 2018
New Revision: 28935

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_24_08_02-c20916a docs


[This commit notification would consist of 1478 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25073][YARN] AM and Executor Memory validation message is not proper while submitting spark yarn application

2018-08-24 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master ab3302895 -> c20916a5d


[SPARK-25073][YARN] AM and Executor Memory validation message is not proper 
while submitting spark yarn application

**## What changes were proposed in this pull request?**
When the yarn.nodemanager.resource.memory-mb or 
yarn.scheduler.maximum-allocation-mb
 memory assignment is insufficient, Spark always reports an error request to 
adjust
yarn.scheduler.maximum-allocation-mb even though in message it shows the memory 
value
of yarn.nodemanager.resource.memory-mb parameter,As the error Message is bit 
misleading to the user  we can modify the same, We can keep the error message 
same as executor memory validation message.

Defintion of **yarn.nodemanager.resource.memory-mb:**
Amount of physical memory, in MB, that can be allocated for containers. It 
means the amount of memory YARN can utilize on this node and therefore this 
property should be lower then the total memory of that machine.
**yarn.scheduler.maximum-allocation-mb:**
It defines the maximum memory allocation available for a container in MB
it means RM can only allocate memory to containers in increments of 
"yarn.scheduler.minimum-allocation-mb" and not exceed 
"yarn.scheduler.maximum-allocation-mb" and It should not be more than total 
allocated memory of the Node.

**## How was this patch tested?**
Manually tested in hdfs-Yarn clustaer

Closes #22199 from sujith71955/maste_am_log.

Authored-by: s71955 
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c20916a5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c20916a5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c20916a5

Branch: refs/heads/master
Commit: c20916a5dc4a7e771463838e797cb944569f6259
Parents: ab33028
Author: s71955 
Authored: Fri Aug 24 08:58:19 2018 -0500
Committer: Sean Owen 
Committed: Fri Aug 24 08:58:19 2018 -0500

--
 .../yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c20916a5/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
--
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 75614a4..698fc2c 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -344,7 +344,8 @@ private[spark] class Client(
 if (amMem > maxMem) {
   throw new IllegalArgumentException(s"Required AM memory ($amMemory" +
 s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of 
this cluster! " +
-"Please increase the value of 'yarn.scheduler.maximum-allocation-mb'.")
+"Please check the values of 'yarn.scheduler.maximum-allocation-mb' 
and/or " +
+"'yarn.nodemanager.resource.memory-mb'.")
 }
 logInfo("Will allocate AM container, with %d MB memory including %d MB 
overhead".format(
   amMem,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r28926 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_24_00_01-ab33028-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-24 Thread pwendell
Author: pwendell
Date: Fri Aug 24 07:16:18 2018
New Revision: 28926

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_24_00_01-ab33028 docs


[This commit notification would consist of 1478 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org