[spark] branch master updated: [SPARK-36336][SQL][FOLLOWUP][SPARK] Mark exception `private`

2021-08-29 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new df3e5a6  [SPARK-36336][SQL][FOLLOWUP][SPARK] Mark exception `private`
df3e5a6 is described below

commit df3e5a620aef91ddbbdb6363eef64696ac6dad25
Author: PengLei 
AuthorDate: Mon Aug 30 10:08:44 2021 +0900

[SPARK-36336][SQL][FOLLOWUP][SPARK] Mark exception `private`

### What changes were proposed in this pull request?

Mark the exception added `private[spark]`
according 
[comments](https://github.com/apache/spark/pull/33573#discussion_r696324962)

### Why are the changes needed?
[comments](https://github.com/apache/spark/pull/33573#discussion_r696324962)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existed ut testcase

Closes #33856 from Peng-Lei/SPARK-36336-FOLLOW.

Authored-by: PengLei 
Signed-off-by: Hyukjin Kwon 
---
 .../scala/org/apache/spark/SparkException.scala| 40 +++---
 1 file changed, 28 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala 
b/core/src/main/scala/org/apache/spark/SparkException.scala
index 7cdb399..28b8ddd 100644
--- a/core/src/main/scala/org/apache/spark/SparkException.scala
+++ b/core/src/main/scala/org/apache/spark/SparkException.scala
@@ -90,7 +90,7 @@ private[spark] class SparkArithmeticException(errorClass: 
String, messageParamet
 /**
  * Class not found exception thrown from Spark with an error class.
  */
-class SparkClassNotFoundException(
+private[spark] class SparkClassNotFoundException(
 errorClass: String,
 messageParameters: Array[String],
 cause: Throwable = null)
@@ -104,7 +104,7 @@ class SparkClassNotFoundException(
 /**
  * Concurrent modification exception thrown from Spark with an error class.
  */
-class SparkConcurrentModificationException(
+private[spark] class SparkConcurrentModificationException(
 errorClass: String,
 messageParameters: Array[String],
 cause: Throwable = null)
@@ -118,7 +118,7 @@ class SparkConcurrentModificationException(
 /**
  * Datetime exception thrown from Spark with an error class.
  */
-class SparkDateTimeException(errorClass: String, messageParameters: 
Array[String])
+private[spark] class SparkDateTimeException(errorClass: String, 
messageParameters: Array[String])
   extends DateTimeException(
 SparkThrowableHelper.getMessage(errorClass, messageParameters)) with 
SparkThrowable {
 
@@ -129,7 +129,9 @@ class SparkDateTimeException(errorClass: String, 
messageParameters: Array[String
 /**
  * Hadoop file already exists exception thrown from Spark with an error class.
  */
-class SparkFileAlreadyExistsException(errorClass: String, messageParameters: 
Array[String])
+private[spark] class SparkFileAlreadyExistsException(
+errorClass: String,
+messageParameters: Array[String])
   extends FileAlreadyExistsException(
 SparkThrowableHelper.getMessage(errorClass, messageParameters)) with 
SparkThrowable {
 
@@ -140,7 +142,9 @@ class SparkFileAlreadyExistsException(errorClass: String, 
messageParameters: Arr
 /**
  * File not found exception thrown from Spark with an error class.
  */
-class SparkFileNotFoundException(errorClass: String, messageParameters: 
Array[String])
+private[spark] class SparkFileNotFoundException(
+errorClass: String,
+messageParameters: Array[String])
   extends FileNotFoundException(
 SparkThrowableHelper.getMessage(errorClass, messageParameters)) with 
SparkThrowable {
 
@@ -151,7 +155,9 @@ class SparkFileNotFoundException(errorClass: String, 
messageParameters: Array[St
 /**
  * No such method exception thrown from Spark with an error class.
  */
-class SparkNoSuchMethodException(errorClass: String, messageParameters: 
Array[String])
+private[spark] class SparkNoSuchMethodException(
+errorClass: String,
+messageParameters: Array[String])
   extends NoSuchMethodException(
 SparkThrowableHelper.getMessage(errorClass, messageParameters)) with 
SparkThrowable {
 
@@ -162,7 +168,9 @@ class SparkNoSuchMethodException(errorClass: String, 
messageParameters: Array[St
 /**
  * Index out of bounds exception thrown from Spark with an error class.
  */
-class SparkIndexOutOfBoundsException(errorClass: String, messageParameters: 
Array[String])
+private[spark] class SparkIndexOutOfBoundsException(
+errorClass: String,
+messageParameters: Array[String])
   extends IndexOutOfBoundsException(
 SparkThrowableHelper.getMessage(errorClass, messageParameters)) with 
SparkThrowable {
 
@@ -173,7 +181,9 @@ class SparkIndexOutOfBoundsException(errorClass: String, 
messageParameters: Arra
 /**
  * IO exception thrown from Spark with an error class.
  */
-class SparkIOException(errorClass: String, messageParameters: 

[spark] branch branch-3.1 updated: [SPARK-36603][CORE] Use WeakReference not SoftReference in LevelDB

2021-08-29 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new c8d05b1  [SPARK-36603][CORE] Use WeakReference not SoftReference in 
LevelDB
c8d05b1 is described below

commit c8d05b12f78a7a63d65fa376bbf3aac352f4cb10
Author: Sean Owen 
AuthorDate: Sun Aug 29 09:29:23 2021 -0700

[SPARK-36603][CORE] Use WeakReference not SoftReference in LevelDB

### What changes were proposed in this pull request?

Use WeakReference not SoftReference in LevelDB

### Why are the changes needed?

(See discussion at 
https://github.com/apache/spark/pull/28769#issuecomment-906722390 )

"The soft reference to iterator introduced in this pr unfortunately ended 
up causing iterators to not be closed when they go out of scope (which would 
have happened earlier in the finalize)

This is because java is more conservative in cleaning up SoftReference's.
The net result was we ended up having 50k files for SHS while typically 
they get compacted away to 200 odd files.

Changing from SoftReference to WeakReference should make it much more 
aggresive in cleanup and prevent the issue - which we observed in a 3.1 SHS"

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests

Closes #33859 from srowen/SPARK-36603.

Authored-by: Sean Owen 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 89e907f76c7143ac595c71d4ac3eed8440a3c148)
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit b76471c5dfd8ffc9410ac522404026bee938dfd6)
Signed-off-by: Dongjoon Hyun 
---
 .../src/main/java/org/apache/spark/util/kvstore/LevelDB.java  | 11 ++-
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
index 121dfbd..6b28373 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
@@ -19,7 +19,8 @@ package org.apache.spark.util.kvstore;
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.ref.SoftReference;
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -69,10 +70,10 @@ public class LevelDB implements KVStore {
 
   /**
* Trying to close a JNI LevelDB handle with a closed DB causes JVM crashes. 
This is used to
-   * ensure that all iterators are correctly closed before LevelDB is closed. 
Use soft reference
+   * ensure that all iterators are correctly closed before LevelDB is closed. 
Use weak references
* to ensure that the iterator can be GCed, when it is only referenced here.
*/
-  private final ConcurrentLinkedQueue>> 
iteratorTracker;
+  private final ConcurrentLinkedQueue>> 
iteratorTracker;
 
   public LevelDB(File path) throws Exception {
 this(path, new KVStoreSerializer());
@@ -250,7 +251,7 @@ public class LevelDB implements KVStore {
   public Iterator iterator() {
 try {
   LevelDBIterator it = new LevelDBIterator<>(type, LevelDB.this, 
this);
-  iteratorTracker.add(new SoftReference<>(it));
+  iteratorTracker.add(new WeakReference<>(it));
   return it;
 } catch (Exception e) {
   throw Throwables.propagate(e);
@@ -301,7 +302,7 @@ public class LevelDB implements KVStore {
 
   try {
 if (iteratorTracker != null) {
-  for (SoftReference> ref: iteratorTracker) {
+  for (Reference> ref: iteratorTracker) {
 LevelDBIterator it = ref.get();
 if (it != null) {
   it.close();

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-36603][CORE] Use WeakReference not SoftReference in LevelDB

2021-08-29 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new b76471c  [SPARK-36603][CORE] Use WeakReference not SoftReference in 
LevelDB
b76471c is described below

commit b76471c5dfd8ffc9410ac522404026bee938dfd6
Author: Sean Owen 
AuthorDate: Sun Aug 29 09:29:23 2021 -0700

[SPARK-36603][CORE] Use WeakReference not SoftReference in LevelDB

### What changes were proposed in this pull request?

Use WeakReference not SoftReference in LevelDB

### Why are the changes needed?

(See discussion at 
https://github.com/apache/spark/pull/28769#issuecomment-906722390 )

"The soft reference to iterator introduced in this pr unfortunately ended 
up causing iterators to not be closed when they go out of scope (which would 
have happened earlier in the finalize)

This is because java is more conservative in cleaning up SoftReference's.
The net result was we ended up having 50k files for SHS while typically 
they get compacted away to 200 odd files.

Changing from SoftReference to WeakReference should make it much more 
aggresive in cleanup and prevent the issue - which we observed in a 3.1 SHS"

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests

Closes #33859 from srowen/SPARK-36603.

Authored-by: Sean Owen 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 89e907f76c7143ac595c71d4ac3eed8440a3c148)
Signed-off-by: Dongjoon Hyun 
---
 .../src/main/java/org/apache/spark/util/kvstore/LevelDB.java  | 11 ++-
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
index 121dfbd..6b28373 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
@@ -19,7 +19,8 @@ package org.apache.spark.util.kvstore;
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.ref.SoftReference;
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -69,10 +70,10 @@ public class LevelDB implements KVStore {
 
   /**
* Trying to close a JNI LevelDB handle with a closed DB causes JVM crashes. 
This is used to
-   * ensure that all iterators are correctly closed before LevelDB is closed. 
Use soft reference
+   * ensure that all iterators are correctly closed before LevelDB is closed. 
Use weak references
* to ensure that the iterator can be GCed, when it is only referenced here.
*/
-  private final ConcurrentLinkedQueue>> 
iteratorTracker;
+  private final ConcurrentLinkedQueue>> 
iteratorTracker;
 
   public LevelDB(File path) throws Exception {
 this(path, new KVStoreSerializer());
@@ -250,7 +251,7 @@ public class LevelDB implements KVStore {
   public Iterator iterator() {
 try {
   LevelDBIterator it = new LevelDBIterator<>(type, LevelDB.this, 
this);
-  iteratorTracker.add(new SoftReference<>(it));
+  iteratorTracker.add(new WeakReference<>(it));
   return it;
 } catch (Exception e) {
   throw Throwables.propagate(e);
@@ -301,7 +302,7 @@ public class LevelDB implements KVStore {
 
   try {
 if (iteratorTracker != null) {
-  for (SoftReference> ref: iteratorTracker) {
+  for (Reference> ref: iteratorTracker) {
 LevelDBIterator it = ref.get();
 if (it != null) {
   it.close();

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-36603][CORE] Use WeakReference not SoftReference in LevelDB

2021-08-29 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 89e907f  [SPARK-36603][CORE] Use WeakReference not SoftReference in 
LevelDB
89e907f is described below

commit 89e907f76c7143ac595c71d4ac3eed8440a3c148
Author: Sean Owen 
AuthorDate: Sun Aug 29 09:29:23 2021 -0700

[SPARK-36603][CORE] Use WeakReference not SoftReference in LevelDB

### What changes were proposed in this pull request?

Use WeakReference not SoftReference in LevelDB

### Why are the changes needed?

(See discussion at 
https://github.com/apache/spark/pull/28769#issuecomment-906722390 )

"The soft reference to iterator introduced in this pr unfortunately ended 
up causing iterators to not be closed when they go out of scope (which would 
have happened earlier in the finalize)

This is because java is more conservative in cleaning up SoftReference's.
The net result was we ended up having 50k files for SHS while typically 
they get compacted away to 200 odd files.

Changing from SoftReference to WeakReference should make it much more 
aggresive in cleanup and prevent the issue - which we observed in a 3.1 SHS"

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests

Closes #33859 from srowen/SPARK-36603.

Authored-by: Sean Owen 
Signed-off-by: Dongjoon Hyun 
---
 .../src/main/java/org/apache/spark/util/kvstore/LevelDB.java  | 11 ++-
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
index 121dfbd..6b28373 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
@@ -19,7 +19,8 @@ package org.apache.spark.util.kvstore;
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.ref.SoftReference;
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -69,10 +70,10 @@ public class LevelDB implements KVStore {
 
   /**
* Trying to close a JNI LevelDB handle with a closed DB causes JVM crashes. 
This is used to
-   * ensure that all iterators are correctly closed before LevelDB is closed. 
Use soft reference
+   * ensure that all iterators are correctly closed before LevelDB is closed. 
Use weak references
* to ensure that the iterator can be GCed, when it is only referenced here.
*/
-  private final ConcurrentLinkedQueue>> 
iteratorTracker;
+  private final ConcurrentLinkedQueue>> 
iteratorTracker;
 
   public LevelDB(File path) throws Exception {
 this(path, new KVStoreSerializer());
@@ -250,7 +251,7 @@ public class LevelDB implements KVStore {
   public Iterator iterator() {
 try {
   LevelDBIterator it = new LevelDBIterator<>(type, LevelDB.this, 
this);
-  iteratorTracker.add(new SoftReference<>(it));
+  iteratorTracker.add(new WeakReference<>(it));
   return it;
 } catch (Exception e) {
   throw Throwables.propagate(e);
@@ -301,7 +302,7 @@ public class LevelDB implements KVStore {
 
   try {
 if (iteratorTracker != null) {
-  for (SoftReference> ref: iteratorTracker) {
+  for (Reference> ref: iteratorTracker) {
 LevelDBIterator it = ref.get();
 if (it != null) {
   it.close();

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (7d1be37 -> 9cefde8)

2021-08-29 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 7d1be37  [SPARK-36576][SS] Improve range split calculation for Kafka 
Source minPartitions option
 add 9cefde8  [SPARK-36580][CORE][K8S] Use `intersect` and `diff` API on 
`Set` instead of manual implementation

No new revisions were added by this update.

Summary of changes:
 core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala | 2 +-
 .../scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala | 2 +-
 .../org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala  | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-36576][SS] Improve range split calculation for Kafka Source minPartitions option

2021-08-29 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7d1be37  [SPARK-36576][SS] Improve range split calculation for Kafka 
Source minPartitions option
7d1be37 is described below

commit 7d1be3710446c23606c3871e28d211ad9776
Author: Andrew Olson 
AuthorDate: Sun Aug 29 16:38:29 2021 +0900

[SPARK-36576][SS] Improve range split calculation for Kafka Source 
minPartitions option

### What changes were proposed in this pull request?

Proposing that the `KafkaOffsetRangeCalculator`'s range calculation logic 
be modified to exclude small (i.e. un-split) partitions from the overall 
proportional distribution math, in order to more reasonably divide the large 
partitions when they are accompanied by many small partitions, and to provide 
optimal behavior for cases where a `minPartitions` value is deliberately 
computed based on the volume of data being read.

### Why are the changes needed?

While the 
[documentation](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)
 does contain a clear disclaimer,

> Please note that this configuration is like a hint: the number of Spark 
tasks will be **approximately** `minPartitions`. It can be less or more 
depending on rounding errors or Kafka partitions that didn't receive any new 
data.

there are cases where the calculated Kafka partition range splits can 
differ greatly from expectations. For evenly distributed data and most 
`minPartitions `values this would not be a major or commonly encountered 
concern. However when the distribution of data across partitions is very 
heavily skewed, somewhat surprising range split calculations can result.

For example, given the following input data:

- 1 partition containing 10,000 messages
- 1,000 partitions each containing 1 message

Spark processing code loading from this collection of 1,001 partitions may 
decide that it would like each task to read no more than 1,000 messages. 
Consequently, it could specify a `minPartitions` value of 1,010 — expecting the 
single large partition to be split into 10 equal chunks, along with the 1,000 
small partitions each having their own task. That is far from what actually 
occurs. The `KafkaOffsetRangeCalculator` algorithm ends up splitting the large 
partition into 918 chunks of [...]

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing unit tests and added new unit tests

Closes #33827 from noslowerdna/SPARK-36576.

Authored-by: Andrew Olson 
Signed-off-by: Jungtaek Lim 
---
 .../sql/kafka010/KafkaOffsetRangeCalculator.scala  | 31 ++--
 .../kafka010/KafkaOffsetRangeCalculatorSuite.scala | 58 --
 .../sql/kafka010/KafkaOffsetReaderSuite.scala  |  4 +-
 3 files changed, 84 insertions(+), 9 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
index 1e9a62e..4c0620a 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
@@ -33,12 +33,13 @@ private[kafka010] class KafkaOffsetRangeCalculator(val 
minPartitions: Option[Int
* Calculate the offset ranges that we are going to process this batch. If 
`minPartitions`
* is not set or is set less than or equal the number of `topicPartitions` 
that we're going to
* consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
-   * `numPartitions` is set higher than the number of our `topicPartitions`, 
then we will split up
+   * `minPartitions` is set higher than the number of our `topicPartitions`, 
then we will split up
* the read tasks of the skewed partitions to multiple Spark tasks.
-   * The number of Spark tasks will be *approximately* `numPartitions`. It can 
be less or more
+   * The number of Spark tasks will be *approximately* `minPartitions`. It can 
be less or more
* depending on rounding errors or Kafka partitions that didn't receive any 
new data.
*
-   * Empty ranges (`KafkaOffsetRange.size <= 0`) will be dropped.
+   * Empty (`KafkaOffsetRange.size == 0`) or invalid (`KafkaOffsetRange.size < 
0`) ranges  will be
+   * dropped.
*/
   def getRanges(
   ranges: Seq[KafkaOffsetRange],
@@ -56,11 +57,29 @@ private[kafka010] class KafkaOffsetRangeCalculator(val 
minPartitions: Option[Int
 
   // Splits offset ranges with relatively large amount of data to smaller 
ones.
   val totalSize =