spark git commit: [MINOR] Fix Scala 2.12 build
Repository: spark Updated Branches: refs/heads/master 8e6427871 -> 3e4f1666a [MINOR] Fix Scala 2.12 build ## What changes were proposed in this pull request? [SPARK-25095](https://github.com/apache/spark/commit/ad45299d047c10472fd3a86103930fe7c54a4cf1) introduced `ambiguous reference to overloaded definition` ``` [error] /Users/d_tsai/dev/apache-spark/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala:242: ambiguous reference to overloaded definition, [error] both method addTaskCompletionListener in class TaskContext of type [U](f: org.apache.spark.TaskContext => U)org.apache.spark.TaskContext [error] and method addTaskCompletionListener in class TaskContext of type (listener: org.apache.spark.util.TaskCompletionListener)org.apache.spark.TaskContext [error] match argument types (org.apache.spark.TaskContext => Unit) [error] context.addTaskCompletionListener(_ => server.close()) [error] ^ [error] one error found [error] Compile failed at Aug 24, 2018 1:56:06 PM [31.582s] ``` which fails the Scala 2.12 branch build. ## How was this patch tested? Existing tests Closes #9 from dbtsai/fix-2.12-build. Authored-by: DB Tsai Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e4f1666 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e4f1666 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e4f1666 Branch: refs/heads/master Commit: 3e4f1666a1253f9d5df05c19b1ce77fe18e9fde3 Parents: 8e64278 Author: DB Tsai Authored: Sat Aug 25 13:48:46 2018 +0800 Committer: hyukjinkwon Committed: Sat Aug 25 13:48:46 2018 +0800 -- core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3e4f1666/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index f824191..151c910 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -239,7 +239,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( } // Close ServerSocket on task completion. serverSocket.foreach { server => - context.addTaskCompletionListener(_ => server.close()) + context.addTaskCompletionListener[Unit](_ => server.close()) } val boundPort: Int = serverSocket.map(_.getLocalPort).getOrElse(0) if (boundPort == -1) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Correct missing punctuation in the documentation
Repository: spark Updated Branches: refs/heads/master 9714fa547 -> 8e6427871 Correct missing punctuation in the documentation ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #22189 from movrsprbp/patch-1. Authored-by: jaroslav chládek Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e642787 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e642787 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e642787 Branch: refs/heads/master Commit: 8e6427871a40b82f4a7a28aaa6e197e4e01dc878 Parents: 9714fa5 Author: jaroslav chládek Authored: Sat Aug 25 12:49:48 2018 +0800 Committer: hyukjinkwon Committed: Sat Aug 25 12:49:48 2018 +0800 -- docs/structured-streaming-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8e642787/docs/structured-streaming-programming-guide.md -- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 355a6cc..73de189 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1005,7 +1005,7 @@ Here is an illustration. As shown in the illustration, the maximum event time tracked by the engine is the *blue dashed line*, and the watermark set as `(max event time - '10 mins')` -at the beginning of every trigger is the red line For example, when the engine observes the data +at the beginning of every trigger is the red line. For example, when the engine observes the data `(12:14, dog)`, it sets the watermark for the next trigger as `12:04`. This watermark lets the engine maintain intermediate state for additional 10 minutes to allow late data to be counted. For example, the data `(12:09, cat)` is out of order and late, and it falls in - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r28952 - in /dev/spark/2.3.3-SNAPSHOT-2018_08_24_18_01-f598382-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Sat Aug 25 01:15:33 2018 New Revision: 28952 Log: Apache Spark 2.3.3-SNAPSHOT-2018_08_24_18_01-f598382 docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r28948 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_24_16_01-9714fa5-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 24 23:16:03 2018 New Revision: 28948 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_24_16_01-9714fa5 docs [This commit notification would consist of 1478 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25124][ML] VectorSizeHint setSize and getSize don't return values backport to 2.3
Repository: spark Updated Branches: refs/heads/branch-2.3 42c1fdd22 -> f5983823e [SPARK-25124][ML] VectorSizeHint setSize and getSize don't return values backport to 2.3 ## What changes were proposed in this pull request? In feature.py, VectorSizeHint setSize and getSize don't return value. Add return. (Please fill in changes proposed in this fix) ## How was this patch tested? Unit Test added Closes #8 from huaxingao/spark-25124-2.3. Authored-by: Huaxin Gao Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5983823 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5983823 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5983823 Branch: refs/heads/branch-2.3 Commit: f5983823e9b4a3b4762481306ea071a73f5742fc Parents: 42c1fdd Author: Huaxin Gao Authored: Fri Aug 24 15:41:18 2018 -0700 Committer: Joseph K. Bradley Committed: Fri Aug 24 15:41:18 2018 -0700 -- python/pyspark/ml/feature.py | 4 ++-- python/pyspark/ml/tests.py | 17 + 2 files changed, 19 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f5983823/python/pyspark/ml/feature.py -- diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 04b07e6..a444fe0 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -3673,12 +3673,12 @@ class VectorSizeHint(JavaTransformer, HasInputCol, HasHandleInvalid, JavaMLReada @since("2.3.0") def getSize(self): """ Gets size param, the size of vectors in `inputCol`.""" -self.getOrDefault(self.size) +return self.getOrDefault(self.size) @since("2.3.0") def setSize(self, value): """ Sets size param, the size of vectors in `inputCol`.""" -self._set(size=value) +return self._set(size=value) if __name__ == "__main__": http://git-wip-us.apache.org/repos/asf/spark/blob/f5983823/python/pyspark/ml/tests.py -- diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 1af2b91..49912d2 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -678,6 +678,23 @@ class FeatureTests(SparkSessionTestCase): expected2 = [Row(id=0, indexed=0.0), Row(id=1, indexed=1.0)] self.assertEqual(actual2, expected2) +def test_vector_size_hint(self): +df = self.spark.createDataFrame( +[(0, Vectors.dense([0.0, 10.0, 0.5])), + (1, Vectors.dense([1.0, 11.0, 0.5, 0.6])), + (2, Vectors.dense([2.0, 12.0]))], +["id", "vector"]) + +sizeHint = VectorSizeHint( +inputCol="vector", +handleInvalid="skip") +sizeHint.setSize(3) +self.assertEqual(sizeHint.getSize(), 3) + +output = sizeHint.transform(df).head().vector +expected = DenseVector([0.0, 10.0, 0.5]) +self.assertEqual(output, expected) + class HasInducedError(Params): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25234][SPARKR] avoid integer overflow in parallelize
Repository: spark Updated Branches: refs/heads/branch-2.3 fcc9bd632 -> 42c1fdd22 [SPARK-25234][SPARKR] avoid integer overflow in parallelize ## What changes were proposed in this pull request? `parallelize` uses integer multiplication to determine the split indices. It might cause integer overflow. ## How was this patch tested? unit test Closes #5 from mengxr/SPARK-25234. Authored-by: Xiangrui Meng Signed-off-by: Xiangrui Meng (cherry picked from commit 9714fa547325ed7b6a8066a88957537936b233dd) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42c1fdd2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42c1fdd2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42c1fdd2 Branch: refs/heads/branch-2.3 Commit: 42c1fdd229b3cf19ff804b7516eae9d36ae50c81 Parents: fcc9bd6 Author: Xiangrui Meng Authored: Fri Aug 24 15:03:00 2018 -0700 Committer: Xiangrui Meng Committed: Fri Aug 24 15:04:11 2018 -0700 -- R/pkg/R/context.R| 9 - R/pkg/tests/fulltests/test_context.R | 7 +++ 2 files changed, 11 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/42c1fdd2/R/pkg/R/context.R -- diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 443c2ff..25e2d15 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -138,11 +138,10 @@ parallelize <- function(sc, coll, numSlices = 1) { sizeLimit <- getMaxAllocationLimit(sc) objectSize <- object.size(coll) + len <- length(coll) # For large objects we make sure the size of each slice is also smaller than sizeLimit - numSerializedSlices <- max(numSlices, ceiling(objectSize / sizeLimit)) - if (numSerializedSlices > length(coll)) -numSerializedSlices <- length(coll) + numSerializedSlices <- min(len, max(numSlices, ceiling(objectSize / sizeLimit))) # Generate the slice ids to put each row # For instance, for numSerializedSlices of 22, length of 50 @@ -153,8 +152,8 @@ parallelize <- function(sc, coll, numSlices = 1) { splits <- if (numSerializedSlices > 0) { unlist(lapply(0: (numSerializedSlices - 1), function(x) { # nolint start - start <- trunc((x * length(coll)) / numSerializedSlices) - end <- trunc(((x + 1) * length(coll)) / numSerializedSlices) + start <- trunc((as.numeric(x) * len) / numSerializedSlices) + end <- trunc(((as.numeric(x) + 1) * len) / numSerializedSlices) # nolint end rep(start, end - start) })) http://git-wip-us.apache.org/repos/asf/spark/blob/42c1fdd2/R/pkg/tests/fulltests/test_context.R -- diff --git a/R/pkg/tests/fulltests/test_context.R b/R/pkg/tests/fulltests/test_context.R index f0d0a51..288a271 100644 --- a/R/pkg/tests/fulltests/test_context.R +++ b/R/pkg/tests/fulltests/test_context.R @@ -240,3 +240,10 @@ test_that("add and get file to be downloaded with Spark job on every node", { unlink(path, recursive = TRUE) sparkR.session.stop() }) + +test_that("SPARK-25234: parallelize should not have integer overflow", { + sc <- sparkR.sparkContext(master = sparkRTestMaster) + # 47000 * 47000 exceeds integer range + parallelize(sc, 1:47000, 47000) + sparkR.session.stop() +}) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25234][SPARKR] avoid integer overflow in parallelize
Repository: spark Updated Branches: refs/heads/master f8346d2fc -> 9714fa547 [SPARK-25234][SPARKR] avoid integer overflow in parallelize ## What changes were proposed in this pull request? `parallelize` uses integer multiplication to determine the split indices. It might cause integer overflow. ## How was this patch tested? unit test Closes #5 from mengxr/SPARK-25234. Authored-by: Xiangrui Meng Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9714fa54 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9714fa54 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9714fa54 Branch: refs/heads/master Commit: 9714fa547325ed7b6a8066a88957537936b233dd Parents: f8346d2 Author: Xiangrui Meng Authored: Fri Aug 24 15:03:00 2018 -0700 Committer: Xiangrui Meng Committed: Fri Aug 24 15:03:00 2018 -0700 -- R/pkg/R/context.R| 9 - R/pkg/tests/fulltests/test_context.R | 7 +++ 2 files changed, 11 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9714fa54/R/pkg/R/context.R -- diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 7e77ea4..f168ca7 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -138,11 +138,10 @@ parallelize <- function(sc, coll, numSlices = 1) { sizeLimit <- getMaxAllocationLimit(sc) objectSize <- object.size(coll) + len <- length(coll) # For large objects we make sure the size of each slice is also smaller than sizeLimit - numSerializedSlices <- max(numSlices, ceiling(objectSize / sizeLimit)) - if (numSerializedSlices > length(coll)) -numSerializedSlices <- length(coll) + numSerializedSlices <- min(len, max(numSlices, ceiling(objectSize / sizeLimit))) # Generate the slice ids to put each row # For instance, for numSerializedSlices of 22, length of 50 @@ -153,8 +152,8 @@ parallelize <- function(sc, coll, numSlices = 1) { splits <- if (numSerializedSlices > 0) { unlist(lapply(0: (numSerializedSlices - 1), function(x) { # nolint start - start <- trunc((x * length(coll)) / numSerializedSlices) - end <- trunc(((x + 1) * length(coll)) / numSerializedSlices) + start <- trunc((as.numeric(x) * len) / numSerializedSlices) + end <- trunc(((as.numeric(x) + 1) * len) / numSerializedSlices) # nolint end rep(start, end - start) })) http://git-wip-us.apache.org/repos/asf/spark/blob/9714fa54/R/pkg/tests/fulltests/test_context.R -- diff --git a/R/pkg/tests/fulltests/test_context.R b/R/pkg/tests/fulltests/test_context.R index f0d0a51..288a271 100644 --- a/R/pkg/tests/fulltests/test_context.R +++ b/R/pkg/tests/fulltests/test_context.R @@ -240,3 +240,10 @@ test_that("add and get file to be downloaded with Spark job on every node", { unlink(path, recursive = TRUE) sparkR.session.stop() }) + +test_that("SPARK-25234: parallelize should not have integer overflow", { + sc <- sparkR.sparkContext(master = sparkRTestMaster) + # 47000 * 47000 exceeds integer range + parallelize(sc, 1:47000, 47000) + sparkR.session.stop() +}) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25174][YARN] Limit the size of diagnostic message for am to unregister itself from rm
Repository: spark Updated Branches: refs/heads/master 8bb9414aa -> f8346d2fc [SPARK-25174][YARN] Limit the size of diagnostic message for am to unregister itself from rm ## What changes were proposed in this pull request? When using older versions of spark releases, a use case generated a huge code-gen file which hit the limitation `Constant pool has grown past JVM limit of 0x`. In this situation, it should fail immediately. But the diagnosis message sent to RM is too large, the ApplicationMaster suspended and RM's ZKStateStore was crashed. For 2.3 or later spark releases the limitation of code-gen has been removed, but maybe there are still some uncaught exceptions that contain oversized error message will cause such a problem. This PR is aim to cut down the diagnosis message size. ## How was this patch tested? Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #22180 from yaooqinn/SPARK-25174. Authored-by: Kent Yao Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f8346d2f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f8346d2f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f8346d2f Branch: refs/heads/master Commit: f8346d2fc01f1e881e4e3f9c4499bf5f9e3ceb3f Parents: 8bb9414 Author: Kent Yao Authored: Fri Aug 24 13:44:19 2018 -0700 Committer: Marcelo Vanzin Committed: Fri Aug 24 13:44:19 2018 -0700 -- .../scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 5 +++-- .../src/main/scala/org/apache/spark/deploy/yarn/config.scala | 6 ++ 2 files changed, 9 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f8346d2f/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala -- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 55ed114..8f94e3f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.yarn import java.io.{File, IOException} import java.lang.reflect.{InvocationTargetException, Modifier} -import java.net.{Socket, URI, URL} +import java.net.{URI, URL} import java.security.PrivilegedExceptionAction import java.util.concurrent.{TimeoutException, TimeUnit} @@ -28,6 +28,7 @@ import scala.concurrent.Promise import scala.concurrent.duration.Duration import scala.util.control.NonFatal +import org.apache.commons.lang3.{StringUtils => ComStrUtils} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.util.StringUtils import org.apache.hadoop.yarn.api._ @@ -368,7 +369,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } logInfo(s"Final app status: $finalStatus, exitCode: $exitCode" + Option(msg).map(msg => s", (reason: $msg)").getOrElse("")) -finalMsg = msg +finalMsg = ComStrUtils.abbreviate(msg, sparkConf.get(AM_FINAL_MSG_LIMIT).toInt) finished = true if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) { logDebug("shutting down reporter thread") http://git-wip-us.apache.org/repos/asf/spark/blob/f8346d2f/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala -- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 1013fd2..ab8273b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -192,6 +192,12 @@ package object config { .toSequence .createWithDefault(Nil) + private[spark] val AM_FINAL_MSG_LIMIT = ConfigBuilder("spark.yarn.am.finalMessageLimit") +.doc("The limit size of final diagnostic message for our ApplicationMaster to unregister from" + + " the ResourceManager.") +.bytesConf(ByteUnit.BYTE) +.createWithDefaultString("1m") + /* Client-mode AM configuration. */ private[spark] val AM_CORES = ConfigBuilder("spark.yarn.am.cores") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25214][SS] Fix the issue that Kafka v2 source may return duplicated records when `failOnDataLoss=false`
Repository: spark Updated Branches: refs/heads/master c20916a5d -> 8bb9414aa [SPARK-25214][SS] Fix the issue that Kafka v2 source may return duplicated records when `failOnDataLoss=false` ## What changes were proposed in this pull request? When there are missing offsets, Kafka v2 source may return duplicated records when `failOnDataLoss=false` because it doesn't skip missing offsets. This PR fixes the issue and also adds regression tests for all Kafka readers. ## How was this patch tested? New tests. Closes #22207 from zsxwing/SPARK-25214. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8bb9414a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8bb9414a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8bb9414a Branch: refs/heads/master Commit: 8bb9414aaff4a147db2d921dccdbd04c8eb4e5db Parents: c20916a Author: Shixiong Zhu Authored: Fri Aug 24 12:00:34 2018 -0700 Committer: Shixiong Zhu Committed: Fri Aug 24 12:00:34 2018 -0700 -- .../kafka010/KafkaMicroBatchReadSupport.scala | 2 +- .../spark/sql/kafka010/KafkaSourceRDD.scala | 38 --- .../kafka010/KafkaDontFailOnDataLossSuite.scala | 272 +++ .../kafka010/KafkaMicroBatchSourceSuite.scala | 139 +- 4 files changed, 276 insertions(+), 175 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8bb9414a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala index c31af60..70f37e3 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala @@ -341,6 +341,7 @@ private[kafka010] case class KafkaMicroBatchPartitionReader( val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss) if (record != null) { nextRow = converter.toUnsafeRow(record) +nextOffset = record.offset + 1 true } else { false @@ -352,7 +353,6 @@ private[kafka010] case class KafkaMicroBatchPartitionReader( override def get(): UnsafeRow = { assert(nextRow != null) -nextOffset += 1 nextRow } http://git-wip-us.apache.org/repos/asf/spark/blob/8bb9414a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala index 8b4494d..f8b9005 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala @@ -77,44 +77,6 @@ private[kafka010] class KafkaSourceRDD( offsetRanges.zipWithIndex.map { case (o, i) => new KafkaSourceRDDPartition(i, o) }.toArray } - override def count(): Long = offsetRanges.map(_.size).sum - - override def countApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] = { -val c = count -new PartialResult(new BoundedDouble(c, 1.0, c, c), true) - } - - override def isEmpty(): Boolean = count == 0L - - override def take(num: Int): Array[ConsumerRecord[Array[Byte], Array[Byte]]] = { -val nonEmptyPartitions = - this.partitions.map(_.asInstanceOf[KafkaSourceRDDPartition]).filter(_.offsetRange.size > 0) - -if (num < 1 || nonEmptyPartitions.isEmpty) { - return new Array[ConsumerRecord[Array[Byte], Array[Byte]]](0) -} - -// Determine in advance how many messages need to be taken from each partition -val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) => - val remain = num - result.values.sum - if (remain > 0) { -val taken = Math.min(remain, part.offsetRange.size) -result + (part.index -> taken.toInt) - } else { -result - } -} - -val buf = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]] -val res = context.runJob( - this, - (tc: TaskContext, it: Iterator[ConsumerRecord[Array[Byte], Array[Byte]]]) => - it.take(parts(tc.partitionId)).toArray, parts.keys.toArray -) -res.foreach(buf ++= _) -buf.toArray - } - override def getPreferredLocat
svn commit: r28935 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_24_08_02-c20916a-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 24 15:16:11 2018 New Revision: 28935 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_24_08_02-c20916a docs [This commit notification would consist of 1478 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25073][YARN] AM and Executor Memory validation message is not proper while submitting spark yarn application
Repository: spark Updated Branches: refs/heads/master ab3302895 -> c20916a5d [SPARK-25073][YARN] AM and Executor Memory validation message is not proper while submitting spark yarn application **## What changes were proposed in this pull request?** When the yarn.nodemanager.resource.memory-mb or yarn.scheduler.maximum-allocation-mb memory assignment is insufficient, Spark always reports an error request to adjust yarn.scheduler.maximum-allocation-mb even though in message it shows the memory value of yarn.nodemanager.resource.memory-mb parameter,As the error Message is bit misleading to the user we can modify the same, We can keep the error message same as executor memory validation message. Defintion of **yarn.nodemanager.resource.memory-mb:** Amount of physical memory, in MB, that can be allocated for containers. It means the amount of memory YARN can utilize on this node and therefore this property should be lower then the total memory of that machine. **yarn.scheduler.maximum-allocation-mb:** It defines the maximum memory allocation available for a container in MB it means RM can only allocate memory to containers in increments of "yarn.scheduler.minimum-allocation-mb" and not exceed "yarn.scheduler.maximum-allocation-mb" and It should not be more than total allocated memory of the Node. **## How was this patch tested?** Manually tested in hdfs-Yarn clustaer Closes #22199 from sujith71955/maste_am_log. Authored-by: s71955 Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c20916a5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c20916a5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c20916a5 Branch: refs/heads/master Commit: c20916a5dc4a7e771463838e797cb944569f6259 Parents: ab33028 Author: s71955 Authored: Fri Aug 24 08:58:19 2018 -0500 Committer: Sean Owen Committed: Fri Aug 24 08:58:19 2018 -0500 -- .../yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c20916a5/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala -- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 75614a4..698fc2c 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -344,7 +344,8 @@ private[spark] class Client( if (amMem > maxMem) { throw new IllegalArgumentException(s"Required AM memory ($amMemory" + s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " + -"Please increase the value of 'yarn.scheduler.maximum-allocation-mb'.") +"Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or " + +"'yarn.nodemanager.resource.memory-mb'.") } logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format( amMem, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r28926 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_24_00_01-ab33028-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 24 07:16:18 2018 New Revision: 28926 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_24_00_01-ab33028 docs [This commit notification would consist of 1478 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org