[spark] branch master updated: [SPARK-36336][SQL][FOLLOWUP][SPARK] Mark exception `private`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new df3e5a6 [SPARK-36336][SQL][FOLLOWUP][SPARK] Mark exception `private` df3e5a6 is described below commit df3e5a620aef91ddbbdb6363eef64696ac6dad25 Author: PengLei AuthorDate: Mon Aug 30 10:08:44 2021 +0900 [SPARK-36336][SQL][FOLLOWUP][SPARK] Mark exception `private` ### What changes were proposed in this pull request? Mark the exception added `private[spark]` according [comments](https://github.com/apache/spark/pull/33573#discussion_r696324962) ### Why are the changes needed? [comments](https://github.com/apache/spark/pull/33573#discussion_r696324962) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existed ut testcase Closes #33856 from Peng-Lei/SPARK-36336-FOLLOW. Authored-by: PengLei Signed-off-by: Hyukjin Kwon --- .../scala/org/apache/spark/SparkException.scala| 40 +++--- 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index 7cdb399..28b8ddd 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -90,7 +90,7 @@ private[spark] class SparkArithmeticException(errorClass: String, messageParamet /** * Class not found exception thrown from Spark with an error class. */ -class SparkClassNotFoundException( +private[spark] class SparkClassNotFoundException( errorClass: String, messageParameters: Array[String], cause: Throwable = null) @@ -104,7 +104,7 @@ class SparkClassNotFoundException( /** * Concurrent modification exception thrown from Spark with an error class. */ -class SparkConcurrentModificationException( +private[spark] class SparkConcurrentModificationException( errorClass: String, messageParameters: Array[String], cause: Throwable = null) @@ -118,7 +118,7 @@ class SparkConcurrentModificationException( /** * Datetime exception thrown from Spark with an error class. */ -class SparkDateTimeException(errorClass: String, messageParameters: Array[String]) +private[spark] class SparkDateTimeException(errorClass: String, messageParameters: Array[String]) extends DateTimeException( SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable { @@ -129,7 +129,9 @@ class SparkDateTimeException(errorClass: String, messageParameters: Array[String /** * Hadoop file already exists exception thrown from Spark with an error class. */ -class SparkFileAlreadyExistsException(errorClass: String, messageParameters: Array[String]) +private[spark] class SparkFileAlreadyExistsException( +errorClass: String, +messageParameters: Array[String]) extends FileAlreadyExistsException( SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable { @@ -140,7 +142,9 @@ class SparkFileAlreadyExistsException(errorClass: String, messageParameters: Arr /** * File not found exception thrown from Spark with an error class. */ -class SparkFileNotFoundException(errorClass: String, messageParameters: Array[String]) +private[spark] class SparkFileNotFoundException( +errorClass: String, +messageParameters: Array[String]) extends FileNotFoundException( SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable { @@ -151,7 +155,9 @@ class SparkFileNotFoundException(errorClass: String, messageParameters: Array[St /** * No such method exception thrown from Spark with an error class. */ -class SparkNoSuchMethodException(errorClass: String, messageParameters: Array[String]) +private[spark] class SparkNoSuchMethodException( +errorClass: String, +messageParameters: Array[String]) extends NoSuchMethodException( SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable { @@ -162,7 +168,9 @@ class SparkNoSuchMethodException(errorClass: String, messageParameters: Array[St /** * Index out of bounds exception thrown from Spark with an error class. */ -class SparkIndexOutOfBoundsException(errorClass: String, messageParameters: Array[String]) +private[spark] class SparkIndexOutOfBoundsException( +errorClass: String, +messageParameters: Array[String]) extends IndexOutOfBoundsException( SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable { @@ -173,7 +181,9 @@ class SparkIndexOutOfBoundsException(errorClass: String, messageParameters: Arra /** * IO exception thrown from Spark with an error class. */ -class SparkIOException(errorClass: String, messageParameters:
[spark] branch branch-3.1 updated: [SPARK-36603][CORE] Use WeakReference not SoftReference in LevelDB
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new c8d05b1 [SPARK-36603][CORE] Use WeakReference not SoftReference in LevelDB c8d05b1 is described below commit c8d05b12f78a7a63d65fa376bbf3aac352f4cb10 Author: Sean Owen AuthorDate: Sun Aug 29 09:29:23 2021 -0700 [SPARK-36603][CORE] Use WeakReference not SoftReference in LevelDB ### What changes were proposed in this pull request? Use WeakReference not SoftReference in LevelDB ### Why are the changes needed? (See discussion at https://github.com/apache/spark/pull/28769#issuecomment-906722390 ) "The soft reference to iterator introduced in this pr unfortunately ended up causing iterators to not be closed when they go out of scope (which would have happened earlier in the finalize) This is because java is more conservative in cleaning up SoftReference's. The net result was we ended up having 50k files for SHS while typically they get compacted away to 200 odd files. Changing from SoftReference to WeakReference should make it much more aggresive in cleanup and prevent the issue - which we observed in a 3.1 SHS" ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests Closes #33859 from srowen/SPARK-36603. Authored-by: Sean Owen Signed-off-by: Dongjoon Hyun (cherry picked from commit 89e907f76c7143ac595c71d4ac3eed8440a3c148) Signed-off-by: Dongjoon Hyun (cherry picked from commit b76471c5dfd8ffc9410ac522404026bee938dfd6) Signed-off-by: Dongjoon Hyun --- .../src/main/java/org/apache/spark/util/kvstore/LevelDB.java | 11 ++- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java index 121dfbd..6b28373 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java @@ -19,7 +19,8 @@ package org.apache.spark.util.kvstore; import java.io.File; import java.io.IOException; -import java.lang.ref.SoftReference; +import java.lang.ref.Reference; +import java.lang.ref.WeakReference; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -69,10 +70,10 @@ public class LevelDB implements KVStore { /** * Trying to close a JNI LevelDB handle with a closed DB causes JVM crashes. This is used to - * ensure that all iterators are correctly closed before LevelDB is closed. Use soft reference + * ensure that all iterators are correctly closed before LevelDB is closed. Use weak references * to ensure that the iterator can be GCed, when it is only referenced here. */ - private final ConcurrentLinkedQueue>> iteratorTracker; + private final ConcurrentLinkedQueue>> iteratorTracker; public LevelDB(File path) throws Exception { this(path, new KVStoreSerializer()); @@ -250,7 +251,7 @@ public class LevelDB implements KVStore { public Iterator iterator() { try { LevelDBIterator it = new LevelDBIterator<>(type, LevelDB.this, this); - iteratorTracker.add(new SoftReference<>(it)); + iteratorTracker.add(new WeakReference<>(it)); return it; } catch (Exception e) { throw Throwables.propagate(e); @@ -301,7 +302,7 @@ public class LevelDB implements KVStore { try { if (iteratorTracker != null) { - for (SoftReference> ref: iteratorTracker) { + for (Reference> ref: iteratorTracker) { LevelDBIterator it = ref.get(); if (it != null) { it.close(); - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-36603][CORE] Use WeakReference not SoftReference in LevelDB
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new b76471c [SPARK-36603][CORE] Use WeakReference not SoftReference in LevelDB b76471c is described below commit b76471c5dfd8ffc9410ac522404026bee938dfd6 Author: Sean Owen AuthorDate: Sun Aug 29 09:29:23 2021 -0700 [SPARK-36603][CORE] Use WeakReference not SoftReference in LevelDB ### What changes were proposed in this pull request? Use WeakReference not SoftReference in LevelDB ### Why are the changes needed? (See discussion at https://github.com/apache/spark/pull/28769#issuecomment-906722390 ) "The soft reference to iterator introduced in this pr unfortunately ended up causing iterators to not be closed when they go out of scope (which would have happened earlier in the finalize) This is because java is more conservative in cleaning up SoftReference's. The net result was we ended up having 50k files for SHS while typically they get compacted away to 200 odd files. Changing from SoftReference to WeakReference should make it much more aggresive in cleanup and prevent the issue - which we observed in a 3.1 SHS" ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests Closes #33859 from srowen/SPARK-36603. Authored-by: Sean Owen Signed-off-by: Dongjoon Hyun (cherry picked from commit 89e907f76c7143ac595c71d4ac3eed8440a3c148) Signed-off-by: Dongjoon Hyun --- .../src/main/java/org/apache/spark/util/kvstore/LevelDB.java | 11 ++- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java index 121dfbd..6b28373 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java @@ -19,7 +19,8 @@ package org.apache.spark.util.kvstore; import java.io.File; import java.io.IOException; -import java.lang.ref.SoftReference; +import java.lang.ref.Reference; +import java.lang.ref.WeakReference; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -69,10 +70,10 @@ public class LevelDB implements KVStore { /** * Trying to close a JNI LevelDB handle with a closed DB causes JVM crashes. This is used to - * ensure that all iterators are correctly closed before LevelDB is closed. Use soft reference + * ensure that all iterators are correctly closed before LevelDB is closed. Use weak references * to ensure that the iterator can be GCed, when it is only referenced here. */ - private final ConcurrentLinkedQueue>> iteratorTracker; + private final ConcurrentLinkedQueue>> iteratorTracker; public LevelDB(File path) throws Exception { this(path, new KVStoreSerializer()); @@ -250,7 +251,7 @@ public class LevelDB implements KVStore { public Iterator iterator() { try { LevelDBIterator it = new LevelDBIterator<>(type, LevelDB.this, this); - iteratorTracker.add(new SoftReference<>(it)); + iteratorTracker.add(new WeakReference<>(it)); return it; } catch (Exception e) { throw Throwables.propagate(e); @@ -301,7 +302,7 @@ public class LevelDB implements KVStore { try { if (iteratorTracker != null) { - for (SoftReference> ref: iteratorTracker) { + for (Reference> ref: iteratorTracker) { LevelDBIterator it = ref.get(); if (it != null) { it.close(); - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-36603][CORE] Use WeakReference not SoftReference in LevelDB
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 89e907f [SPARK-36603][CORE] Use WeakReference not SoftReference in LevelDB 89e907f is described below commit 89e907f76c7143ac595c71d4ac3eed8440a3c148 Author: Sean Owen AuthorDate: Sun Aug 29 09:29:23 2021 -0700 [SPARK-36603][CORE] Use WeakReference not SoftReference in LevelDB ### What changes were proposed in this pull request? Use WeakReference not SoftReference in LevelDB ### Why are the changes needed? (See discussion at https://github.com/apache/spark/pull/28769#issuecomment-906722390 ) "The soft reference to iterator introduced in this pr unfortunately ended up causing iterators to not be closed when they go out of scope (which would have happened earlier in the finalize) This is because java is more conservative in cleaning up SoftReference's. The net result was we ended up having 50k files for SHS while typically they get compacted away to 200 odd files. Changing from SoftReference to WeakReference should make it much more aggresive in cleanup and prevent the issue - which we observed in a 3.1 SHS" ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests Closes #33859 from srowen/SPARK-36603. Authored-by: Sean Owen Signed-off-by: Dongjoon Hyun --- .../src/main/java/org/apache/spark/util/kvstore/LevelDB.java | 11 ++- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java index 121dfbd..6b28373 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java @@ -19,7 +19,8 @@ package org.apache.spark.util.kvstore; import java.io.File; import java.io.IOException; -import java.lang.ref.SoftReference; +import java.lang.ref.Reference; +import java.lang.ref.WeakReference; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -69,10 +70,10 @@ public class LevelDB implements KVStore { /** * Trying to close a JNI LevelDB handle with a closed DB causes JVM crashes. This is used to - * ensure that all iterators are correctly closed before LevelDB is closed. Use soft reference + * ensure that all iterators are correctly closed before LevelDB is closed. Use weak references * to ensure that the iterator can be GCed, when it is only referenced here. */ - private final ConcurrentLinkedQueue>> iteratorTracker; + private final ConcurrentLinkedQueue>> iteratorTracker; public LevelDB(File path) throws Exception { this(path, new KVStoreSerializer()); @@ -250,7 +251,7 @@ public class LevelDB implements KVStore { public Iterator iterator() { try { LevelDBIterator it = new LevelDBIterator<>(type, LevelDB.this, this); - iteratorTracker.add(new SoftReference<>(it)); + iteratorTracker.add(new WeakReference<>(it)); return it; } catch (Exception e) { throw Throwables.propagate(e); @@ -301,7 +302,7 @@ public class LevelDB implements KVStore { try { if (iteratorTracker != null) { - for (SoftReference> ref: iteratorTracker) { + for (Reference> ref: iteratorTracker) { LevelDBIterator it = ref.get(); if (it != null) { it.close(); - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (7d1be37 -> 9cefde8)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 7d1be37 [SPARK-36576][SS] Improve range split calculation for Kafka Source minPartitions option add 9cefde8 [SPARK-36580][CORE][K8S] Use `intersect` and `diff` API on `Set` instead of manual implementation No new revisions were added by this update. Summary of changes: core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala | 2 +- .../scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala | 2 +- .../org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-36576][SS] Improve range split calculation for Kafka Source minPartitions option
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7d1be37 [SPARK-36576][SS] Improve range split calculation for Kafka Source minPartitions option 7d1be37 is described below commit 7d1be3710446c23606c3871e28d211ad9776 Author: Andrew Olson AuthorDate: Sun Aug 29 16:38:29 2021 +0900 [SPARK-36576][SS] Improve range split calculation for Kafka Source minPartitions option ### What changes were proposed in this pull request? Proposing that the `KafkaOffsetRangeCalculator`'s range calculation logic be modified to exclude small (i.e. un-split) partitions from the overall proportional distribution math, in order to more reasonably divide the large partitions when they are accompanied by many small partitions, and to provide optimal behavior for cases where a `minPartitions` value is deliberately computed based on the volume of data being read. ### Why are the changes needed? While the [documentation](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html) does contain a clear disclaimer, > Please note that this configuration is like a hint: the number of Spark tasks will be **approximately** `minPartitions`. It can be less or more depending on rounding errors or Kafka partitions that didn't receive any new data. there are cases where the calculated Kafka partition range splits can differ greatly from expectations. For evenly distributed data and most `minPartitions `values this would not be a major or commonly encountered concern. However when the distribution of data across partitions is very heavily skewed, somewhat surprising range split calculations can result. For example, given the following input data: - 1 partition containing 10,000 messages - 1,000 partitions each containing 1 message Spark processing code loading from this collection of 1,001 partitions may decide that it would like each task to read no more than 1,000 messages. Consequently, it could specify a `minPartitions` value of 1,010 — expecting the single large partition to be split into 10 equal chunks, along with the 1,000 small partitions each having their own task. That is far from what actually occurs. The `KafkaOffsetRangeCalculator` algorithm ends up splitting the large partition into 918 chunks of [...] ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests and added new unit tests Closes #33827 from noslowerdna/SPARK-36576. Authored-by: Andrew Olson Signed-off-by: Jungtaek Lim --- .../sql/kafka010/KafkaOffsetRangeCalculator.scala | 31 ++-- .../kafka010/KafkaOffsetRangeCalculatorSuite.scala | 58 -- .../sql/kafka010/KafkaOffsetReaderSuite.scala | 4 +- 3 files changed, 84 insertions(+), 9 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala index 1e9a62e..4c0620a 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala @@ -33,12 +33,13 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int * Calculate the offset ranges that we are going to process this batch. If `minPartitions` * is not set or is set less than or equal the number of `topicPartitions` that we're going to * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If - * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * `minPartitions` is set higher than the number of our `topicPartitions`, then we will split up * the read tasks of the skewed partitions to multiple Spark tasks. - * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * The number of Spark tasks will be *approximately* `minPartitions`. It can be less or more * depending on rounding errors or Kafka partitions that didn't receive any new data. * - * Empty ranges (`KafkaOffsetRange.size <= 0`) will be dropped. + * Empty (`KafkaOffsetRange.size == 0`) or invalid (`KafkaOffsetRange.size < 0`) ranges will be + * dropped. */ def getRanges( ranges: Seq[KafkaOffsetRange], @@ -56,11 +57,29 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int // Splits offset ranges with relatively large amount of data to smaller ones. val totalSize =