[spark] branch master updated (0811666 -> d8a0d85)

2020-09-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 0811666  [SPARK-32878][CORE] Avoid scheduling TaskSetManager which has 
no pending tasks
 add d8a0d85  [SPARK-32884][TESTS] Mark TPCDSQuery*Suite as ExtendedSQLTest

No new revisions were added by this update.

Summary of changes:
 sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala | 4 
 1 file changed, 4 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (0811666 -> d8a0d85)

2020-09-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 0811666  [SPARK-32878][CORE] Avoid scheduling TaskSetManager which has 
no pending tasks
 add d8a0d85  [SPARK-32884][TESTS] Mark TPCDSQuery*Suite as ExtendedSQLTest

No new revisions were added by this update.

Summary of changes:
 sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala | 4 
 1 file changed, 4 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (0811666 -> d8a0d85)

2020-09-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 0811666  [SPARK-32878][CORE] Avoid scheduling TaskSetManager which has 
no pending tasks
 add d8a0d85  [SPARK-32884][TESTS] Mark TPCDSQuery*Suite as ExtendedSQLTest

No new revisions were added by this update.

Summary of changes:
 sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala | 4 
 1 file changed, 4 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (0811666 -> d8a0d85)

2020-09-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 0811666  [SPARK-32878][CORE] Avoid scheduling TaskSetManager which has 
no pending tasks
 add d8a0d85  [SPARK-32884][TESTS] Mark TPCDSQuery*Suite as ExtendedSQLTest

No new revisions were added by this update.

Summary of changes:
 sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala | 4 
 1 file changed, 4 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (0811666 -> d8a0d85)

2020-09-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 0811666  [SPARK-32878][CORE] Avoid scheduling TaskSetManager which has 
no pending tasks
 add d8a0d85  [SPARK-32884][TESTS] Mark TPCDSQuery*Suite as ExtendedSQLTest

No new revisions were added by this update.

Summary of changes:
 sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala | 4 
 1 file changed, 4 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (7a9b066 -> 0811666)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 7a9b066  [SPARK-32715][CORE] Fix memory leak when failed to store 
pieces of broadcast
 add 0811666  [SPARK-32878][CORE] Avoid scheduling TaskSetManager which has 
no pending tasks

No new revisions were added by this update.

Summary of changes:
 core/src/main/scala/org/apache/spark/scheduler/Pool.scala | 4 +++-
 core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala  | 1 +
 .../src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +-
 core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala   | 3 +++
 4 files changed, 8 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (7a9b066 -> 0811666)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 7a9b066  [SPARK-32715][CORE] Fix memory leak when failed to store 
pieces of broadcast
 add 0811666  [SPARK-32878][CORE] Avoid scheduling TaskSetManager which has 
no pending tasks

No new revisions were added by this update.

Summary of changes:
 core/src/main/scala/org/apache/spark/scheduler/Pool.scala | 4 +++-
 core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala  | 1 +
 .../src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +-
 core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala   | 3 +++
 4 files changed, 8 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (7a9b066 -> 0811666)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 7a9b066  [SPARK-32715][CORE] Fix memory leak when failed to store 
pieces of broadcast
 add 0811666  [SPARK-32878][CORE] Avoid scheduling TaskSetManager which has 
no pending tasks

No new revisions were added by this update.

Summary of changes:
 core/src/main/scala/org/apache/spark/scheduler/Pool.scala | 4 +++-
 core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala  | 1 +
 .../src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +-
 core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala   | 3 +++
 4 files changed, 8 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (7a9b066 -> 0811666)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 7a9b066  [SPARK-32715][CORE] Fix memory leak when failed to store 
pieces of broadcast
 add 0811666  [SPARK-32878][CORE] Avoid scheduling TaskSetManager which has 
no pending tasks

No new revisions were added by this update.

Summary of changes:
 core/src/main/scala/org/apache/spark/scheduler/Pool.scala | 4 +++-
 core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala  | 1 +
 .../src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +-
 core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala   | 3 +++
 4 files changed, 8 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (7a9b066 -> 0811666)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 7a9b066  [SPARK-32715][CORE] Fix memory leak when failed to store 
pieces of broadcast
 add 0811666  [SPARK-32878][CORE] Avoid scheduling TaskSetManager which has 
no pending tasks

No new revisions were added by this update.

Summary of changes:
 core/src/main/scala/org/apache/spark/scheduler/Pool.scala | 4 +++-
 core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala  | 1 +
 .../src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +-
 core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala   | 3 +++
 4 files changed, 8 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new cf0c907  [SPARK-32715][CORE] Fix memory leak when failed to store 
pieces of broadcast
cf0c907 is described below

commit cf0c907a1926c26f86b165ba7389ded375f0aa35
Author: LantaoJin 
AuthorDate: Mon Sep 14 18:24:52 2020 -0700

[SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast

### What changes were proposed in this pull request?
In TorrentBroadcast.scala
```scala
L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, 
tellMaster = false))
L137: TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true))
```
After the original value is saved successfully(TorrentBroadcast.scala: 
L133), but the following `blockifyObject()`(L137) or store piece(L147) steps 
are failed. There is no opportunity to release broadcast from memory.

This patch is to remove all pieces of the broadcast when failed to blockify 
or failed to store some pieces of a broadcast.

### Why are the changes needed?
We use Spark thrift-server as a long-running service. A bad query submitted 
a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the 
bad query but we found the driver's memory usage was still high and full GCs 
were still frequent. By investigating with GC dump and log, we found the 
broadcast may memory leak.

> 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure)
2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc):
116G->112G(170G), 184.9121920 secs]
[Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 
116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)]
1: 676531691 72035438432 [B
2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow
3: 99551 12018117568 [Ljava.lang.Object;
4: 26570 4349629040 [I
5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow;
6: 1708819 256299456 [C
7: 2338 179615208 [J
8: 1703669 54517408 java.lang.String
9: 103860 34896960 org.apache.spark.status.TaskDataWrapper
10: 177396 25545024 java.net.URI
...

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually test. This UT is hard to write and the patch is straightforward.

Closes #29558 from LantaoJin/SPARK-32715.

Authored-by: LantaoJin 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 7a9b066c66d29e946b4f384292021123beb6fe57)
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/broadcast/TorrentBroadcast.scala  | 32 ++
 1 file changed, 20 insertions(+), 12 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index cbd49e0..bd017d0 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -123,22 +123,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: 
T, id: Long)
 if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, 
tellMaster = false)) {
   throw new SparkException(s"Failed to store $broadcastId in BlockManager")
 }
-val blocks =
-  TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
-if (checksumEnabled) {
-  checksums = new Array[Int](blocks.length)
-}
-blocks.zipWithIndex.foreach { case (block, i) =>
+try {
+  val blocks =
+TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
   if (checksumEnabled) {
-checksums(i) = calcChecksum(block)
+checksums = new Array[Int](blocks.length)
   }
-  val pieceId = BroadcastBlockId(id, "piece" + i)
-  val bytes = new ChunkedByteBuffer(block.duplicate())
-  if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true)) {
-throw new SparkException(s"Failed to store $pieceId of $broadcastId in 
local BlockManager")
+  blocks.zipWithIndex.foreach { case (block, i) =>
+if (checksumEnabled) {
+  checksums(i) = calcChecksum(block)
+}
+val pieceId = BroadcastBlockId(id, "piece" + i)
+val bytes = new ChunkedByteBuffer(block.duplicate())
+if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true)) {
+  throw new SparkException(s"Failed to store $pieceId of $broadcastId 
" +
+s"in local BlockManager")
+}
   }
+  

[spark] branch branch-3.0 updated: [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new fe6ff15  [SPARK-32715][CORE] Fix memory leak when failed to store 
pieces of broadcast
fe6ff15 is described below

commit fe6ff1577d18fc919db926894b32500e17e07ecb
Author: LantaoJin 
AuthorDate: Mon Sep 14 18:24:52 2020 -0700

[SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast

### What changes were proposed in this pull request?
In TorrentBroadcast.scala
```scala
L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, 
tellMaster = false))
L137: TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true))
```
After the original value is saved successfully(TorrentBroadcast.scala: 
L133), but the following `blockifyObject()`(L137) or store piece(L147) steps 
are failed. There is no opportunity to release broadcast from memory.

This patch is to remove all pieces of the broadcast when failed to blockify 
or failed to store some pieces of a broadcast.

### Why are the changes needed?
We use Spark thrift-server as a long-running service. A bad query submitted 
a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the 
bad query but we found the driver's memory usage was still high and full GCs 
were still frequent. By investigating with GC dump and log, we found the 
broadcast may memory leak.

> 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure)
2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc):
116G->112G(170G), 184.9121920 secs]
[Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 
116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)]
1: 676531691 72035438432 [B
2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow
3: 99551 12018117568 [Ljava.lang.Object;
4: 26570 4349629040 [I
5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow;
6: 1708819 256299456 [C
7: 2338 179615208 [J
8: 1703669 54517408 java.lang.String
9: 103860 34896960 org.apache.spark.status.TaskDataWrapper
10: 177396 25545024 java.net.URI
...

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually test. This UT is hard to write and the patch is straightforward.

Closes #29558 from LantaoJin/SPARK-32715.

Authored-by: LantaoJin 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 7a9b066c66d29e946b4f384292021123beb6fe57)
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/broadcast/TorrentBroadcast.scala  | 32 ++
 1 file changed, 20 insertions(+), 12 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 77fbbc0..1024d9b 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -133,22 +133,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: 
T, id: Long)
 if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, 
tellMaster = false)) {
   throw new SparkException(s"Failed to store $broadcastId in BlockManager")
 }
-val blocks =
-  TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
-if (checksumEnabled) {
-  checksums = new Array[Int](blocks.length)
-}
-blocks.zipWithIndex.foreach { case (block, i) =>
+try {
+  val blocks =
+TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
   if (checksumEnabled) {
-checksums(i) = calcChecksum(block)
+checksums = new Array[Int](blocks.length)
   }
-  val pieceId = BroadcastBlockId(id, "piece" + i)
-  val bytes = new ChunkedByteBuffer(block.duplicate())
-  if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true)) {
-throw new SparkException(s"Failed to store $pieceId of $broadcastId in 
local BlockManager")
+  blocks.zipWithIndex.foreach { case (block, i) =>
+if (checksumEnabled) {
+  checksums(i) = calcChecksum(block)
+}
+val pieceId = BroadcastBlockId(id, "piece" + i)
+val bytes = new ChunkedByteBuffer(block.duplicate())
+if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true)) {
+  throw new SparkException(s"Failed to store $pieceId of $broadcastId 
" +
+s"in local BlockManager")
+}
   }
+  

[spark] branch branch-3.0 updated: [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new fe6ff15  [SPARK-32715][CORE] Fix memory leak when failed to store 
pieces of broadcast
fe6ff15 is described below

commit fe6ff1577d18fc919db926894b32500e17e07ecb
Author: LantaoJin 
AuthorDate: Mon Sep 14 18:24:52 2020 -0700

[SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast

### What changes were proposed in this pull request?
In TorrentBroadcast.scala
```scala
L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, 
tellMaster = false))
L137: TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true))
```
After the original value is saved successfully(TorrentBroadcast.scala: 
L133), but the following `blockifyObject()`(L137) or store piece(L147) steps 
are failed. There is no opportunity to release broadcast from memory.

This patch is to remove all pieces of the broadcast when failed to blockify 
or failed to store some pieces of a broadcast.

### Why are the changes needed?
We use Spark thrift-server as a long-running service. A bad query submitted 
a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the 
bad query but we found the driver's memory usage was still high and full GCs 
were still frequent. By investigating with GC dump and log, we found the 
broadcast may memory leak.

> 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure)
2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc):
116G->112G(170G), 184.9121920 secs]
[Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 
116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)]
1: 676531691 72035438432 [B
2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow
3: 99551 12018117568 [Ljava.lang.Object;
4: 26570 4349629040 [I
5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow;
6: 1708819 256299456 [C
7: 2338 179615208 [J
8: 1703669 54517408 java.lang.String
9: 103860 34896960 org.apache.spark.status.TaskDataWrapper
10: 177396 25545024 java.net.URI
...

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually test. This UT is hard to write and the patch is straightforward.

Closes #29558 from LantaoJin/SPARK-32715.

Authored-by: LantaoJin 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 7a9b066c66d29e946b4f384292021123beb6fe57)
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/broadcast/TorrentBroadcast.scala  | 32 ++
 1 file changed, 20 insertions(+), 12 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 77fbbc0..1024d9b 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -133,22 +133,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: 
T, id: Long)
 if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, 
tellMaster = false)) {
   throw new SparkException(s"Failed to store $broadcastId in BlockManager")
 }
-val blocks =
-  TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
-if (checksumEnabled) {
-  checksums = new Array[Int](blocks.length)
-}
-blocks.zipWithIndex.foreach { case (block, i) =>
+try {
+  val blocks =
+TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
   if (checksumEnabled) {
-checksums(i) = calcChecksum(block)
+checksums = new Array[Int](blocks.length)
   }
-  val pieceId = BroadcastBlockId(id, "piece" + i)
-  val bytes = new ChunkedByteBuffer(block.duplicate())
-  if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true)) {
-throw new SparkException(s"Failed to store $pieceId of $broadcastId in 
local BlockManager")
+  blocks.zipWithIndex.foreach { case (block, i) =>
+if (checksumEnabled) {
+  checksums(i) = calcChecksum(block)
+}
+val pieceId = BroadcastBlockId(id, "piece" + i)
+val bytes = new ChunkedByteBuffer(block.duplicate())
+if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true)) {
+  throw new SparkException(s"Failed to store $pieceId of $broadcastId 
" +
+s"in local BlockManager")
+}
   }
+  

[spark] branch master updated (4fac6d5 -> 7a9b066)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 4fac6d5  [SPARK-32871][BUILD] Append toMap to Map#filterKeys if the 
result of filter is concatenated with another Map for Scala 2.13
 add 7a9b066  [SPARK-32715][CORE] Fix memory leak when failed to store 
pieces of broadcast

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/broadcast/TorrentBroadcast.scala  | 32 ++
 1 file changed, 20 insertions(+), 12 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new cf0c907  [SPARK-32715][CORE] Fix memory leak when failed to store 
pieces of broadcast
cf0c907 is described below

commit cf0c907a1926c26f86b165ba7389ded375f0aa35
Author: LantaoJin 
AuthorDate: Mon Sep 14 18:24:52 2020 -0700

[SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast

### What changes were proposed in this pull request?
In TorrentBroadcast.scala
```scala
L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, 
tellMaster = false))
L137: TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true))
```
After the original value is saved successfully(TorrentBroadcast.scala: 
L133), but the following `blockifyObject()`(L137) or store piece(L147) steps 
are failed. There is no opportunity to release broadcast from memory.

This patch is to remove all pieces of the broadcast when failed to blockify 
or failed to store some pieces of a broadcast.

### Why are the changes needed?
We use Spark thrift-server as a long-running service. A bad query submitted 
a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the 
bad query but we found the driver's memory usage was still high and full GCs 
were still frequent. By investigating with GC dump and log, we found the 
broadcast may memory leak.

> 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure)
2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc):
116G->112G(170G), 184.9121920 secs]
[Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 
116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)]
1: 676531691 72035438432 [B
2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow
3: 99551 12018117568 [Ljava.lang.Object;
4: 26570 4349629040 [I
5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow;
6: 1708819 256299456 [C
7: 2338 179615208 [J
8: 1703669 54517408 java.lang.String
9: 103860 34896960 org.apache.spark.status.TaskDataWrapper
10: 177396 25545024 java.net.URI
...

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually test. This UT is hard to write and the patch is straightforward.

Closes #29558 from LantaoJin/SPARK-32715.

Authored-by: LantaoJin 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 7a9b066c66d29e946b4f384292021123beb6fe57)
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/broadcast/TorrentBroadcast.scala  | 32 ++
 1 file changed, 20 insertions(+), 12 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index cbd49e0..bd017d0 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -123,22 +123,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: 
T, id: Long)
 if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, 
tellMaster = false)) {
   throw new SparkException(s"Failed to store $broadcastId in BlockManager")
 }
-val blocks =
-  TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
-if (checksumEnabled) {
-  checksums = new Array[Int](blocks.length)
-}
-blocks.zipWithIndex.foreach { case (block, i) =>
+try {
+  val blocks =
+TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
   if (checksumEnabled) {
-checksums(i) = calcChecksum(block)
+checksums = new Array[Int](blocks.length)
   }
-  val pieceId = BroadcastBlockId(id, "piece" + i)
-  val bytes = new ChunkedByteBuffer(block.duplicate())
-  if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true)) {
-throw new SparkException(s"Failed to store $pieceId of $broadcastId in 
local BlockManager")
+  blocks.zipWithIndex.foreach { case (block, i) =>
+if (checksumEnabled) {
+  checksums(i) = calcChecksum(block)
+}
+val pieceId = BroadcastBlockId(id, "piece" + i)
+val bytes = new ChunkedByteBuffer(block.duplicate())
+if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true)) {
+  throw new SparkException(s"Failed to store $pieceId of $broadcastId 
" +
+s"in local BlockManager")
+}
   }
+  

[spark] branch branch-3.0 updated: [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new fe6ff15  [SPARK-32715][CORE] Fix memory leak when failed to store 
pieces of broadcast
fe6ff15 is described below

commit fe6ff1577d18fc919db926894b32500e17e07ecb
Author: LantaoJin 
AuthorDate: Mon Sep 14 18:24:52 2020 -0700

[SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast

### What changes were proposed in this pull request?
In TorrentBroadcast.scala
```scala
L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, 
tellMaster = false))
L137: TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true))
```
After the original value is saved successfully(TorrentBroadcast.scala: 
L133), but the following `blockifyObject()`(L137) or store piece(L147) steps 
are failed. There is no opportunity to release broadcast from memory.

This patch is to remove all pieces of the broadcast when failed to blockify 
or failed to store some pieces of a broadcast.

### Why are the changes needed?
We use Spark thrift-server as a long-running service. A bad query submitted 
a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the 
bad query but we found the driver's memory usage was still high and full GCs 
were still frequent. By investigating with GC dump and log, we found the 
broadcast may memory leak.

> 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure)
2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc):
116G->112G(170G), 184.9121920 secs]
[Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 
116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)]
1: 676531691 72035438432 [B
2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow
3: 99551 12018117568 [Ljava.lang.Object;
4: 26570 4349629040 [I
5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow;
6: 1708819 256299456 [C
7: 2338 179615208 [J
8: 1703669 54517408 java.lang.String
9: 103860 34896960 org.apache.spark.status.TaskDataWrapper
10: 177396 25545024 java.net.URI
...

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually test. This UT is hard to write and the patch is straightforward.

Closes #29558 from LantaoJin/SPARK-32715.

Authored-by: LantaoJin 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 7a9b066c66d29e946b4f384292021123beb6fe57)
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/broadcast/TorrentBroadcast.scala  | 32 ++
 1 file changed, 20 insertions(+), 12 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 77fbbc0..1024d9b 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -133,22 +133,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: 
T, id: Long)
 if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, 
tellMaster = false)) {
   throw new SparkException(s"Failed to store $broadcastId in BlockManager")
 }
-val blocks =
-  TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
-if (checksumEnabled) {
-  checksums = new Array[Int](blocks.length)
-}
-blocks.zipWithIndex.foreach { case (block, i) =>
+try {
+  val blocks =
+TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
   if (checksumEnabled) {
-checksums(i) = calcChecksum(block)
+checksums = new Array[Int](blocks.length)
   }
-  val pieceId = BroadcastBlockId(id, "piece" + i)
-  val bytes = new ChunkedByteBuffer(block.duplicate())
-  if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true)) {
-throw new SparkException(s"Failed to store $pieceId of $broadcastId in 
local BlockManager")
+  blocks.zipWithIndex.foreach { case (block, i) =>
+if (checksumEnabled) {
+  checksums(i) = calcChecksum(block)
+}
+val pieceId = BroadcastBlockId(id, "piece" + i)
+val bytes = new ChunkedByteBuffer(block.duplicate())
+if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true)) {
+  throw new SparkException(s"Failed to store $pieceId of $broadcastId 
" +
+s"in local BlockManager")
+}
   }
+  

[spark] branch master updated (4fac6d5 -> 7a9b066)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 4fac6d5  [SPARK-32871][BUILD] Append toMap to Map#filterKeys if the 
result of filter is concatenated with another Map for Scala 2.13
 add 7a9b066  [SPARK-32715][CORE] Fix memory leak when failed to store 
pieces of broadcast

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/broadcast/TorrentBroadcast.scala  | 32 ++
 1 file changed, 20 insertions(+), 12 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new cf0c907  [SPARK-32715][CORE] Fix memory leak when failed to store 
pieces of broadcast
cf0c907 is described below

commit cf0c907a1926c26f86b165ba7389ded375f0aa35
Author: LantaoJin 
AuthorDate: Mon Sep 14 18:24:52 2020 -0700

[SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast

### What changes were proposed in this pull request?
In TorrentBroadcast.scala
```scala
L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, 
tellMaster = false))
L137: TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true))
```
After the original value is saved successfully(TorrentBroadcast.scala: 
L133), but the following `blockifyObject()`(L137) or store piece(L147) steps 
are failed. There is no opportunity to release broadcast from memory.

This patch is to remove all pieces of the broadcast when failed to blockify 
or failed to store some pieces of a broadcast.

### Why are the changes needed?
We use Spark thrift-server as a long-running service. A bad query submitted 
a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the 
bad query but we found the driver's memory usage was still high and full GCs 
were still frequent. By investigating with GC dump and log, we found the 
broadcast may memory leak.

> 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure)
2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc):
116G->112G(170G), 184.9121920 secs]
[Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 
116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)]
1: 676531691 72035438432 [B
2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow
3: 99551 12018117568 [Ljava.lang.Object;
4: 26570 4349629040 [I
5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow;
6: 1708819 256299456 [C
7: 2338 179615208 [J
8: 1703669 54517408 java.lang.String
9: 103860 34896960 org.apache.spark.status.TaskDataWrapper
10: 177396 25545024 java.net.URI
...

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually test. This UT is hard to write and the patch is straightforward.

Closes #29558 from LantaoJin/SPARK-32715.

Authored-by: LantaoJin 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 7a9b066c66d29e946b4f384292021123beb6fe57)
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/broadcast/TorrentBroadcast.scala  | 32 ++
 1 file changed, 20 insertions(+), 12 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index cbd49e0..bd017d0 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -123,22 +123,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: 
T, id: Long)
 if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, 
tellMaster = false)) {
   throw new SparkException(s"Failed to store $broadcastId in BlockManager")
 }
-val blocks =
-  TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
-if (checksumEnabled) {
-  checksums = new Array[Int](blocks.length)
-}
-blocks.zipWithIndex.foreach { case (block, i) =>
+try {
+  val blocks =
+TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
   if (checksumEnabled) {
-checksums(i) = calcChecksum(block)
+checksums = new Array[Int](blocks.length)
   }
-  val pieceId = BroadcastBlockId(id, "piece" + i)
-  val bytes = new ChunkedByteBuffer(block.duplicate())
-  if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true)) {
-throw new SparkException(s"Failed to store $pieceId of $broadcastId in 
local BlockManager")
+  blocks.zipWithIndex.foreach { case (block, i) =>
+if (checksumEnabled) {
+  checksums(i) = calcChecksum(block)
+}
+val pieceId = BroadcastBlockId(id, "piece" + i)
+val bytes = new ChunkedByteBuffer(block.duplicate())
+if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true)) {
+  throw new SparkException(s"Failed to store $pieceId of $broadcastId 
" +
+s"in local BlockManager")
+}
   }
+  

[spark] branch branch-3.0 updated: [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new fe6ff15  [SPARK-32715][CORE] Fix memory leak when failed to store 
pieces of broadcast
fe6ff15 is described below

commit fe6ff1577d18fc919db926894b32500e17e07ecb
Author: LantaoJin 
AuthorDate: Mon Sep 14 18:24:52 2020 -0700

[SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast

### What changes were proposed in this pull request?
In TorrentBroadcast.scala
```scala
L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, 
tellMaster = false))
L137: TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true))
```
After the original value is saved successfully(TorrentBroadcast.scala: 
L133), but the following `blockifyObject()`(L137) or store piece(L147) steps 
are failed. There is no opportunity to release broadcast from memory.

This patch is to remove all pieces of the broadcast when failed to blockify 
or failed to store some pieces of a broadcast.

### Why are the changes needed?
We use Spark thrift-server as a long-running service. A bad query submitted 
a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the 
bad query but we found the driver's memory usage was still high and full GCs 
were still frequent. By investigating with GC dump and log, we found the 
broadcast may memory leak.

> 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure)
2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc):
116G->112G(170G), 184.9121920 secs]
[Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 
116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)]
1: 676531691 72035438432 [B
2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow
3: 99551 12018117568 [Ljava.lang.Object;
4: 26570 4349629040 [I
5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow;
6: 1708819 256299456 [C
7: 2338 179615208 [J
8: 1703669 54517408 java.lang.String
9: 103860 34896960 org.apache.spark.status.TaskDataWrapper
10: 177396 25545024 java.net.URI
...

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually test. This UT is hard to write and the patch is straightforward.

Closes #29558 from LantaoJin/SPARK-32715.

Authored-by: LantaoJin 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 7a9b066c66d29e946b4f384292021123beb6fe57)
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/broadcast/TorrentBroadcast.scala  | 32 ++
 1 file changed, 20 insertions(+), 12 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 77fbbc0..1024d9b 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -133,22 +133,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: 
T, id: Long)
 if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, 
tellMaster = false)) {
   throw new SparkException(s"Failed to store $broadcastId in BlockManager")
 }
-val blocks =
-  TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
-if (checksumEnabled) {
-  checksums = new Array[Int](blocks.length)
-}
-blocks.zipWithIndex.foreach { case (block, i) =>
+try {
+  val blocks =
+TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
   if (checksumEnabled) {
-checksums(i) = calcChecksum(block)
+checksums = new Array[Int](blocks.length)
   }
-  val pieceId = BroadcastBlockId(id, "piece" + i)
-  val bytes = new ChunkedByteBuffer(block.duplicate())
-  if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true)) {
-throw new SparkException(s"Failed to store $pieceId of $broadcastId in 
local BlockManager")
+  blocks.zipWithIndex.foreach { case (block, i) =>
+if (checksumEnabled) {
+  checksums(i) = calcChecksum(block)
+}
+val pieceId = BroadcastBlockId(id, "piece" + i)
+val bytes = new ChunkedByteBuffer(block.duplicate())
+if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true)) {
+  throw new SparkException(s"Failed to store $pieceId of $broadcastId 
" +
+s"in local BlockManager")
+}
   }
+  

[spark] branch master updated (4fac6d5 -> 7a9b066)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 4fac6d5  [SPARK-32871][BUILD] Append toMap to Map#filterKeys if the 
result of filter is concatenated with another Map for Scala 2.13
 add 7a9b066  [SPARK-32715][CORE] Fix memory leak when failed to store 
pieces of broadcast

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/broadcast/TorrentBroadcast.scala  | 32 ++
 1 file changed, 20 insertions(+), 12 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new cf0c907  [SPARK-32715][CORE] Fix memory leak when failed to store 
pieces of broadcast
cf0c907 is described below

commit cf0c907a1926c26f86b165ba7389ded375f0aa35
Author: LantaoJin 
AuthorDate: Mon Sep 14 18:24:52 2020 -0700

[SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast

### What changes were proposed in this pull request?
In TorrentBroadcast.scala
```scala
L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, 
tellMaster = false))
L137: TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true))
```
After the original value is saved successfully(TorrentBroadcast.scala: 
L133), but the following `blockifyObject()`(L137) or store piece(L147) steps 
are failed. There is no opportunity to release broadcast from memory.

This patch is to remove all pieces of the broadcast when failed to blockify 
or failed to store some pieces of a broadcast.

### Why are the changes needed?
We use Spark thrift-server as a long-running service. A bad query submitted 
a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the 
bad query but we found the driver's memory usage was still high and full GCs 
were still frequent. By investigating with GC dump and log, we found the 
broadcast may memory leak.

> 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure)
2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc):
116G->112G(170G), 184.9121920 secs]
[Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 
116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)]
1: 676531691 72035438432 [B
2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow
3: 99551 12018117568 [Ljava.lang.Object;
4: 26570 4349629040 [I
5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow;
6: 1708819 256299456 [C
7: 2338 179615208 [J
8: 1703669 54517408 java.lang.String
9: 103860 34896960 org.apache.spark.status.TaskDataWrapper
10: 177396 25545024 java.net.URI
...

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually test. This UT is hard to write and the patch is straightforward.

Closes #29558 from LantaoJin/SPARK-32715.

Authored-by: LantaoJin 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 7a9b066c66d29e946b4f384292021123beb6fe57)
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/broadcast/TorrentBroadcast.scala  | 32 ++
 1 file changed, 20 insertions(+), 12 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index cbd49e0..bd017d0 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -123,22 +123,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: 
T, id: Long)
 if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, 
tellMaster = false)) {
   throw new SparkException(s"Failed to store $broadcastId in BlockManager")
 }
-val blocks =
-  TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
-if (checksumEnabled) {
-  checksums = new Array[Int](blocks.length)
-}
-blocks.zipWithIndex.foreach { case (block, i) =>
+try {
+  val blocks =
+TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
   if (checksumEnabled) {
-checksums(i) = calcChecksum(block)
+checksums = new Array[Int](blocks.length)
   }
-  val pieceId = BroadcastBlockId(id, "piece" + i)
-  val bytes = new ChunkedByteBuffer(block.duplicate())
-  if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true)) {
-throw new SparkException(s"Failed to store $pieceId of $broadcastId in 
local BlockManager")
+  blocks.zipWithIndex.foreach { case (block, i) =>
+if (checksumEnabled) {
+  checksums(i) = calcChecksum(block)
+}
+val pieceId = BroadcastBlockId(id, "piece" + i)
+val bytes = new ChunkedByteBuffer(block.duplicate())
+if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true)) {
+  throw new SparkException(s"Failed to store $pieceId of $broadcastId 
" +
+s"in local BlockManager")
+}
   }
+  

[spark] branch branch-3.0 updated: [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new fe6ff15  [SPARK-32715][CORE] Fix memory leak when failed to store 
pieces of broadcast
fe6ff15 is described below

commit fe6ff1577d18fc919db926894b32500e17e07ecb
Author: LantaoJin 
AuthorDate: Mon Sep 14 18:24:52 2020 -0700

[SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast

### What changes were proposed in this pull request?
In TorrentBroadcast.scala
```scala
L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, 
tellMaster = false))
L137: TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true))
```
After the original value is saved successfully(TorrentBroadcast.scala: 
L133), but the following `blockifyObject()`(L137) or store piece(L147) steps 
are failed. There is no opportunity to release broadcast from memory.

This patch is to remove all pieces of the broadcast when failed to blockify 
or failed to store some pieces of a broadcast.

### Why are the changes needed?
We use Spark thrift-server as a long-running service. A bad query submitted 
a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the 
bad query but we found the driver's memory usage was still high and full GCs 
were still frequent. By investigating with GC dump and log, we found the 
broadcast may memory leak.

> 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure)
2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc):
116G->112G(170G), 184.9121920 secs]
[Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 
116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)]
1: 676531691 72035438432 [B
2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow
3: 99551 12018117568 [Ljava.lang.Object;
4: 26570 4349629040 [I
5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow;
6: 1708819 256299456 [C
7: 2338 179615208 [J
8: 1703669 54517408 java.lang.String
9: 103860 34896960 org.apache.spark.status.TaskDataWrapper
10: 177396 25545024 java.net.URI
...

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually test. This UT is hard to write and the patch is straightforward.

Closes #29558 from LantaoJin/SPARK-32715.

Authored-by: LantaoJin 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 7a9b066c66d29e946b4f384292021123beb6fe57)
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/broadcast/TorrentBroadcast.scala  | 32 ++
 1 file changed, 20 insertions(+), 12 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 77fbbc0..1024d9b 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -133,22 +133,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: 
T, id: Long)
 if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, 
tellMaster = false)) {
   throw new SparkException(s"Failed to store $broadcastId in BlockManager")
 }
-val blocks =
-  TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
-if (checksumEnabled) {
-  checksums = new Array[Int](blocks.length)
-}
-blocks.zipWithIndex.foreach { case (block, i) =>
+try {
+  val blocks =
+TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
   if (checksumEnabled) {
-checksums(i) = calcChecksum(block)
+checksums = new Array[Int](blocks.length)
   }
-  val pieceId = BroadcastBlockId(id, "piece" + i)
-  val bytes = new ChunkedByteBuffer(block.duplicate())
-  if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true)) {
-throw new SparkException(s"Failed to store $pieceId of $broadcastId in 
local BlockManager")
+  blocks.zipWithIndex.foreach { case (block, i) =>
+if (checksumEnabled) {
+  checksums(i) = calcChecksum(block)
+}
+val pieceId = BroadcastBlockId(id, "piece" + i)
+val bytes = new ChunkedByteBuffer(block.duplicate())
+if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true)) {
+  throw new SparkException(s"Failed to store $pieceId of $broadcastId 
" +
+s"in local BlockManager")
+}
   }
+  

[spark] branch master updated (4fac6d5 -> 7a9b066)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 4fac6d5  [SPARK-32871][BUILD] Append toMap to Map#filterKeys if the 
result of filter is concatenated with another Map for Scala 2.13
 add 7a9b066  [SPARK-32715][CORE] Fix memory leak when failed to store 
pieces of broadcast

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/broadcast/TorrentBroadcast.scala  | 32 ++
 1 file changed, 20 insertions(+), 12 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 5543f98  [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at 
MAX_CAPACITY from exceeding growth threshold
5543f98 is described below

commit 5543f9840a024fd3e97fb64d8372c69a358e8977
Author: Ankur Dave 
AuthorDate: Mon Sep 14 18:12:42 2020 -0700

[SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from 
exceeding growth threshold

### What changes were proposed in this pull request?

When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, 
`numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` 
is false. This correctly prevents the map from growing, but `canGrowArray` 
incorrectly remains true. Therefore the map keeps accepting new keys and 
exceeds its growth threshold. If we attempt to spill the map in this state, the 
UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By 
this point the task has ty [...]

This PR fixes the issue by setting `canGrowArray` to false in this case. 
This prevents the map from accepting new elements when it cannot grow to 
accommodate them.

### Why are the changes needed?

Without this change, hash aggregations will fail when the number of groups 
per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), 
and when the grouping aggregation is the only memory-consuming operator in its 
stage.

For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` 
fails when `tbl` contains 1 billion distinct values and when 
`spark.sql.shuffle.partitions=1`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Reproducing this issue requires building a very large BytesToBytesMap. 
Because this is infeasible to do in a unit test, this PR was tested manually by 
adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the 
test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.

```java
public abstract class AbstractBytesToBytesMapSuite {
  // ...
  Test
  public void respectGrowthThresholdAtMaxCapacity() {
TestMemoryManager memoryManager2 =
new TestMemoryManager(
new SparkConf()
  .set("spark.memory.offHeap.enabled", "true")
  .set("spark.memory.offHeap.size", Long.toString(25600 * 1024 
* 1024L)));
TaskMemoryManager taskMemoryManager2 = new 
TaskMemoryManager(memoryManager2, 0);
final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page 
marker
final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 
1024, pageSizeBytes);

try {
  // Insert keys into the map until it stops accepting new keys.
  for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
if (i % (1024 * 1024) == 0) System.out.println("Inserting element " 
+ i);
final long[] value = new long[]{i};
BytesToBytesMap.Location loc = map.lookup(value, 
Platform.LONG_ARRAY_OFFSET, 8);
Assert.assertFalse(loc.isDefined());
boolean success =
loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, 
Platform.LONG_ARRAY_OFFSET, 8);
if (!success) break;
  }

  // The map should grow to its max capacity.
  long capacity = map.getArray().size() / 2;
  Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);

  // The map should stop accepting new keys once it has reached its 
growth
  // threshold, which is half the max capacity.
  Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);

  map.free();
} finally {
  map.free();
}
  }
}
```

Closes #29753 from ankurdave/SPARK-32872-2.4.

Authored-by: Ankur Dave 
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++--
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 5ab52cc..e2d258a 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -756,12 +756,21 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
 longArray.set(pos * 2 + 1, keyHashcode);
 isDefined = true;
 
-// We use two array entries per key, so the array size is twice the 
capacity.
-// We should compare the current capacity of the array, instead of 

[spark] branch branch-2.4 updated: [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 5543f98  [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at 
MAX_CAPACITY from exceeding growth threshold
5543f98 is described below

commit 5543f9840a024fd3e97fb64d8372c69a358e8977
Author: Ankur Dave 
AuthorDate: Mon Sep 14 18:12:42 2020 -0700

[SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from 
exceeding growth threshold

### What changes were proposed in this pull request?

When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, 
`numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` 
is false. This correctly prevents the map from growing, but `canGrowArray` 
incorrectly remains true. Therefore the map keeps accepting new keys and 
exceeds its growth threshold. If we attempt to spill the map in this state, the 
UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By 
this point the task has ty [...]

This PR fixes the issue by setting `canGrowArray` to false in this case. 
This prevents the map from accepting new elements when it cannot grow to 
accommodate them.

### Why are the changes needed?

Without this change, hash aggregations will fail when the number of groups 
per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), 
and when the grouping aggregation is the only memory-consuming operator in its 
stage.

For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` 
fails when `tbl` contains 1 billion distinct values and when 
`spark.sql.shuffle.partitions=1`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Reproducing this issue requires building a very large BytesToBytesMap. 
Because this is infeasible to do in a unit test, this PR was tested manually by 
adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the 
test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.

```java
public abstract class AbstractBytesToBytesMapSuite {
  // ...
  Test
  public void respectGrowthThresholdAtMaxCapacity() {
TestMemoryManager memoryManager2 =
new TestMemoryManager(
new SparkConf()
  .set("spark.memory.offHeap.enabled", "true")
  .set("spark.memory.offHeap.size", Long.toString(25600 * 1024 
* 1024L)));
TaskMemoryManager taskMemoryManager2 = new 
TaskMemoryManager(memoryManager2, 0);
final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page 
marker
final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 
1024, pageSizeBytes);

try {
  // Insert keys into the map until it stops accepting new keys.
  for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
if (i % (1024 * 1024) == 0) System.out.println("Inserting element " 
+ i);
final long[] value = new long[]{i};
BytesToBytesMap.Location loc = map.lookup(value, 
Platform.LONG_ARRAY_OFFSET, 8);
Assert.assertFalse(loc.isDefined());
boolean success =
loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, 
Platform.LONG_ARRAY_OFFSET, 8);
if (!success) break;
  }

  // The map should grow to its max capacity.
  long capacity = map.getArray().size() / 2;
  Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);

  // The map should stop accepting new keys once it has reached its 
growth
  // threshold, which is half the max capacity.
  Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);

  map.free();
} finally {
  map.free();
}
  }
}
```

Closes #29753 from ankurdave/SPARK-32872-2.4.

Authored-by: Ankur Dave 
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++--
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 5ab52cc..e2d258a 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -756,12 +756,21 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
 longArray.set(pos * 2 + 1, keyHashcode);
 isDefined = true;
 
-// We use two array entries per key, so the array size is twice the 
capacity.
-// We should compare the current capacity of the array, instead of 

[spark] branch branch-2.4 updated: [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 5543f98  [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at 
MAX_CAPACITY from exceeding growth threshold
5543f98 is described below

commit 5543f9840a024fd3e97fb64d8372c69a358e8977
Author: Ankur Dave 
AuthorDate: Mon Sep 14 18:12:42 2020 -0700

[SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from 
exceeding growth threshold

### What changes were proposed in this pull request?

When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, 
`numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` 
is false. This correctly prevents the map from growing, but `canGrowArray` 
incorrectly remains true. Therefore the map keeps accepting new keys and 
exceeds its growth threshold. If we attempt to spill the map in this state, the 
UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By 
this point the task has ty [...]

This PR fixes the issue by setting `canGrowArray` to false in this case. 
This prevents the map from accepting new elements when it cannot grow to 
accommodate them.

### Why are the changes needed?

Without this change, hash aggregations will fail when the number of groups 
per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), 
and when the grouping aggregation is the only memory-consuming operator in its 
stage.

For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` 
fails when `tbl` contains 1 billion distinct values and when 
`spark.sql.shuffle.partitions=1`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Reproducing this issue requires building a very large BytesToBytesMap. 
Because this is infeasible to do in a unit test, this PR was tested manually by 
adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the 
test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.

```java
public abstract class AbstractBytesToBytesMapSuite {
  // ...
  Test
  public void respectGrowthThresholdAtMaxCapacity() {
TestMemoryManager memoryManager2 =
new TestMemoryManager(
new SparkConf()
  .set("spark.memory.offHeap.enabled", "true")
  .set("spark.memory.offHeap.size", Long.toString(25600 * 1024 
* 1024L)));
TaskMemoryManager taskMemoryManager2 = new 
TaskMemoryManager(memoryManager2, 0);
final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page 
marker
final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 
1024, pageSizeBytes);

try {
  // Insert keys into the map until it stops accepting new keys.
  for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
if (i % (1024 * 1024) == 0) System.out.println("Inserting element " 
+ i);
final long[] value = new long[]{i};
BytesToBytesMap.Location loc = map.lookup(value, 
Platform.LONG_ARRAY_OFFSET, 8);
Assert.assertFalse(loc.isDefined());
boolean success =
loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, 
Platform.LONG_ARRAY_OFFSET, 8);
if (!success) break;
  }

  // The map should grow to its max capacity.
  long capacity = map.getArray().size() / 2;
  Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);

  // The map should stop accepting new keys once it has reached its 
growth
  // threshold, which is half the max capacity.
  Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);

  map.free();
} finally {
  map.free();
}
  }
}
```

Closes #29753 from ankurdave/SPARK-32872-2.4.

Authored-by: Ankur Dave 
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++--
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 5ab52cc..e2d258a 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -756,12 +756,21 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
 longArray.set(pos * 2 + 1, keyHashcode);
 isDefined = true;
 
-// We use two array entries per key, so the array size is twice the 
capacity.
-// We should compare the current capacity of the array, instead of 

[spark] branch branch-2.4 updated: [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 5543f98  [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at 
MAX_CAPACITY from exceeding growth threshold
5543f98 is described below

commit 5543f9840a024fd3e97fb64d8372c69a358e8977
Author: Ankur Dave 
AuthorDate: Mon Sep 14 18:12:42 2020 -0700

[SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from 
exceeding growth threshold

### What changes were proposed in this pull request?

When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, 
`numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` 
is false. This correctly prevents the map from growing, but `canGrowArray` 
incorrectly remains true. Therefore the map keeps accepting new keys and 
exceeds its growth threshold. If we attempt to spill the map in this state, the 
UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By 
this point the task has ty [...]

This PR fixes the issue by setting `canGrowArray` to false in this case. 
This prevents the map from accepting new elements when it cannot grow to 
accommodate them.

### Why are the changes needed?

Without this change, hash aggregations will fail when the number of groups 
per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), 
and when the grouping aggregation is the only memory-consuming operator in its 
stage.

For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` 
fails when `tbl` contains 1 billion distinct values and when 
`spark.sql.shuffle.partitions=1`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Reproducing this issue requires building a very large BytesToBytesMap. 
Because this is infeasible to do in a unit test, this PR was tested manually by 
adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the 
test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.

```java
public abstract class AbstractBytesToBytesMapSuite {
  // ...
  Test
  public void respectGrowthThresholdAtMaxCapacity() {
TestMemoryManager memoryManager2 =
new TestMemoryManager(
new SparkConf()
  .set("spark.memory.offHeap.enabled", "true")
  .set("spark.memory.offHeap.size", Long.toString(25600 * 1024 
* 1024L)));
TaskMemoryManager taskMemoryManager2 = new 
TaskMemoryManager(memoryManager2, 0);
final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page 
marker
final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 
1024, pageSizeBytes);

try {
  // Insert keys into the map until it stops accepting new keys.
  for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
if (i % (1024 * 1024) == 0) System.out.println("Inserting element " 
+ i);
final long[] value = new long[]{i};
BytesToBytesMap.Location loc = map.lookup(value, 
Platform.LONG_ARRAY_OFFSET, 8);
Assert.assertFalse(loc.isDefined());
boolean success =
loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, 
Platform.LONG_ARRAY_OFFSET, 8);
if (!success) break;
  }

  // The map should grow to its max capacity.
  long capacity = map.getArray().size() / 2;
  Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);

  // The map should stop accepting new keys once it has reached its 
growth
  // threshold, which is half the max capacity.
  Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);

  map.free();
} finally {
  map.free();
}
  }
}
```

Closes #29753 from ankurdave/SPARK-32872-2.4.

Authored-by: Ankur Dave 
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++--
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 5ab52cc..e2d258a 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -756,12 +756,21 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
 longArray.set(pos * 2 + 1, keyHashcode);
 isDefined = true;
 
-// We use two array entries per key, so the array size is twice the 
capacity.
-// We should compare the current capacity of the array, instead of 

[spark] branch branch-2.4 updated: [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 5543f98  [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at 
MAX_CAPACITY from exceeding growth threshold
5543f98 is described below

commit 5543f9840a024fd3e97fb64d8372c69a358e8977
Author: Ankur Dave 
AuthorDate: Mon Sep 14 18:12:42 2020 -0700

[SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from 
exceeding growth threshold

### What changes were proposed in this pull request?

When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, 
`numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` 
is false. This correctly prevents the map from growing, but `canGrowArray` 
incorrectly remains true. Therefore the map keeps accepting new keys and 
exceeds its growth threshold. If we attempt to spill the map in this state, the 
UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By 
this point the task has ty [...]

This PR fixes the issue by setting `canGrowArray` to false in this case. 
This prevents the map from accepting new elements when it cannot grow to 
accommodate them.

### Why are the changes needed?

Without this change, hash aggregations will fail when the number of groups 
per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), 
and when the grouping aggregation is the only memory-consuming operator in its 
stage.

For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` 
fails when `tbl` contains 1 billion distinct values and when 
`spark.sql.shuffle.partitions=1`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Reproducing this issue requires building a very large BytesToBytesMap. 
Because this is infeasible to do in a unit test, this PR was tested manually by 
adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the 
test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.

```java
public abstract class AbstractBytesToBytesMapSuite {
  // ...
  Test
  public void respectGrowthThresholdAtMaxCapacity() {
TestMemoryManager memoryManager2 =
new TestMemoryManager(
new SparkConf()
  .set("spark.memory.offHeap.enabled", "true")
  .set("spark.memory.offHeap.size", Long.toString(25600 * 1024 
* 1024L)));
TaskMemoryManager taskMemoryManager2 = new 
TaskMemoryManager(memoryManager2, 0);
final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page 
marker
final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 
1024, pageSizeBytes);

try {
  // Insert keys into the map until it stops accepting new keys.
  for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
if (i % (1024 * 1024) == 0) System.out.println("Inserting element " 
+ i);
final long[] value = new long[]{i};
BytesToBytesMap.Location loc = map.lookup(value, 
Platform.LONG_ARRAY_OFFSET, 8);
Assert.assertFalse(loc.isDefined());
boolean success =
loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, 
Platform.LONG_ARRAY_OFFSET, 8);
if (!success) break;
  }

  // The map should grow to its max capacity.
  long capacity = map.getArray().size() / 2;
  Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);

  // The map should stop accepting new keys once it has reached its 
growth
  // threshold, which is half the max capacity.
  Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);

  map.free();
} finally {
  map.free();
}
  }
}
```

Closes #29753 from ankurdave/SPARK-32872-2.4.

Authored-by: Ankur Dave 
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++--
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 5ab52cc..e2d258a 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -756,12 +756,21 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
 longArray.set(pos * 2 + 1, keyHashcode);
 isDefined = true;
 
-// We use two array entries per key, so the array size is twice the 
capacity.
-// We should compare the current capacity of the array, instead of 

[spark] branch master updated (d58a4a3 -> 4fac6d5)

2020-09-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from d58a4a3  [SPARK-32882][K8S] Remove python2 installation in K8s python 
image
 add 4fac6d5  [SPARK-32871][BUILD] Append toMap to Map#filterKeys if the 
result of filter is concatenated with another Map for Scala 2.13

No new revisions were added by this update.

Summary of changes:
 sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala| 4 ++--
 sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala| 4 ++--
 .../main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala  | 4 ++--
 .../main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala  | 4 ++--
 4 files changed, 8 insertions(+), 8 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (d58a4a3 -> 4fac6d5)

2020-09-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from d58a4a3  [SPARK-32882][K8S] Remove python2 installation in K8s python 
image
 add 4fac6d5  [SPARK-32871][BUILD] Append toMap to Map#filterKeys if the 
result of filter is concatenated with another Map for Scala 2.13

No new revisions were added by this update.

Summary of changes:
 sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala| 4 ++--
 sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala| 4 ++--
 .../main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala  | 4 ++--
 .../main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala  | 4 ++--
 4 files changed, 8 insertions(+), 8 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (d58a4a3 -> 4fac6d5)

2020-09-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from d58a4a3  [SPARK-32882][K8S] Remove python2 installation in K8s python 
image
 add 4fac6d5  [SPARK-32871][BUILD] Append toMap to Map#filterKeys if the 
result of filter is concatenated with another Map for Scala 2.13

No new revisions were added by this update.

Summary of changes:
 sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala| 4 ++--
 sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala| 4 ++--
 .../main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala  | 4 ++--
 .../main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala  | 4 ++--
 4 files changed, 8 insertions(+), 8 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (d58a4a3 -> 4fac6d5)

2020-09-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from d58a4a3  [SPARK-32882][K8S] Remove python2 installation in K8s python 
image
 add 4fac6d5  [SPARK-32871][BUILD] Append toMap to Map#filterKeys if the 
result of filter is concatenated with another Map for Scala 2.13

No new revisions were added by this update.

Summary of changes:
 sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala| 4 ++--
 sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala| 4 ++--
 .../main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala  | 4 ++--
 .../main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala  | 4 ++--
 4 files changed, 8 insertions(+), 8 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (d58a4a3 -> 4fac6d5)

2020-09-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from d58a4a3  [SPARK-32882][K8S] Remove python2 installation in K8s python 
image
 add 4fac6d5  [SPARK-32871][BUILD] Append toMap to Map#filterKeys if the 
result of filter is concatenated with another Map for Scala 2.13

No new revisions were added by this update.

Summary of changes:
 sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala| 4 ++--
 sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala| 4 ++--
 .../main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala  | 4 ++--
 .../main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala  | 4 ++--
 4 files changed, 8 insertions(+), 8 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (72550c3 -> d58a4a3)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 72550c3  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold
 add d58a4a3  [SPARK-32882][K8S] Remove python2 installation in K8s python 
image

No new revisions were added by this update.

Summary of changes:
 .../docker/src/main/dockerfiles/spark/bindings/python/Dockerfile  | 8 +---
 1 file changed, 1 insertion(+), 7 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (72550c3 -> d58a4a3)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 72550c3  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold
 add d58a4a3  [SPARK-32882][K8S] Remove python2 installation in K8s python 
image

No new revisions were added by this update.

Summary of changes:
 .../docker/src/main/dockerfiles/spark/bindings/python/Dockerfile  | 8 +---
 1 file changed, 1 insertion(+), 7 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (72550c3 -> d58a4a3)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 72550c3  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold
 add d58a4a3  [SPARK-32882][K8S] Remove python2 installation in K8s python 
image

No new revisions were added by this update.

Summary of changes:
 .../docker/src/main/dockerfiles/spark/bindings/python/Dockerfile  | 8 +---
 1 file changed, 1 insertion(+), 7 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (72550c3 -> d58a4a3)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 72550c3  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold
 add d58a4a3  [SPARK-32882][K8S] Remove python2 installation in K8s python 
image

No new revisions were added by this update.

Summary of changes:
 .../docker/src/main/dockerfiles/spark/bindings/python/Dockerfile  | 8 +---
 1 file changed, 1 insertion(+), 7 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (72550c3 -> d58a4a3)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 72550c3  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold
 add d58a4a3  [SPARK-32882][K8S] Remove python2 installation in K8s python 
image

No new revisions were added by this update.

Summary of changes:
 .../docker/src/main/dockerfiles/spark/bindings/python/Dockerfile  | 8 +---
 1 file changed, 1 insertion(+), 7 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated (ad61af0 -> 94b9d6f)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git.


from ad61af0  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold
 add 94b9d6f  Revert "[SPARK-32872][CORE] Prevent BytesToBytesMap at 
MAX_CAPACITY from exceeding growth threshold"

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 ++---
 1 file changed, 6 insertions(+), 15 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated (ad61af0 -> 94b9d6f)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git.


from ad61af0  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold
 add 94b9d6f  Revert "[SPARK-32872][CORE] Prevent BytesToBytesMap at 
MAX_CAPACITY from exceeding growth threshold"

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 ++---
 1 file changed, 6 insertions(+), 15 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated (ad61af0 -> 94b9d6f)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git.


from ad61af0  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold
 add 94b9d6f  Revert "[SPARK-32872][CORE] Prevent BytesToBytesMap at 
MAX_CAPACITY from exceeding growth threshold"

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 ++---
 1 file changed, 6 insertions(+), 15 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated (d33052b -> ad61af0)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git.


from d33052b  [SPARK-32708] Query optimization fails to reuse exchange with 
DataSourceV2
 add ad61af0  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++--
 1 file changed, 15 insertions(+), 6 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated (ad61af0 -> 94b9d6f)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git.


from ad61af0  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold
 add 94b9d6f  Revert "[SPARK-32872][CORE] Prevent BytesToBytesMap at 
MAX_CAPACITY from exceeding growth threshold"

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 ++---
 1 file changed, 6 insertions(+), 15 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new ad61af0  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold
ad61af0 is described below

commit ad61af01a98487176e7bc15ef31d697c85d863f3
Author: Ankur Dave 
AuthorDate: Mon Sep 14 13:58:15 2020 -0700

[SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding 
growth threshold

When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, 
`numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` 
is false. This correctly prevents the map from growing, but `canGrowArray` 
incorrectly remains true. Therefore the map keeps accepting new keys and 
exceeds its growth threshold. If we attempt to spill the map in this state, the 
UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By 
this point the task has ty [...]

This PR fixes the issue by setting `canGrowArray` to false in this case. 
This prevents the map from accepting new elements when it cannot grow to 
accommodate them.

Without this change, hash aggregations will fail when the number of groups 
per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), 
and when the grouping aggregation is the only memory-consuming operator in its 
stage.

For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` 
fails when `tbl` contains 1 billion distinct values and when 
`spark.sql.shuffle.partitions=1`.

No.

Reproducing this issue requires building a very large BytesToBytesMap. 
Because this is infeasible to do in a unit test, this PR was tested manually by 
adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the 
test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.

```java
public abstract class AbstractBytesToBytesMapSuite {
  // ...
  Test
  public void respectGrowthThresholdAtMaxCapacity() {
TestMemoryManager memoryManager2 =
new TestMemoryManager(
new SparkConf()
.set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true)
.set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 
1024L)
.set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false)
.set(package$.MODULE$.SHUFFLE_COMPRESS(), false));
TaskMemoryManager taskMemoryManager2 = new 
TaskMemoryManager(memoryManager2, 0);
final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page 
marker
final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 
1024, pageSizeBytes);

try {
  // Insert keys into the map until it stops accepting new keys.
  for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
if (i % (1024 * 1024) == 0) System.out.println("Inserting element " 
+ i);
final long[] value = new long[]{i};
BytesToBytesMap.Location loc = map.lookup(value, 
Platform.LONG_ARRAY_OFFSET, 8);
Assert.assertFalse(loc.isDefined());
boolean success =
loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, 
Platform.LONG_ARRAY_OFFSET, 8);
if (!success) break;
  }

  // The map should grow to its max capacity.
  long capacity = map.getArray().size() / 2;
  Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);

  // The map should stop accepting new keys once it has reached its 
growth
  // threshold, which is half the max capacity.
  Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);

  map.free();
} finally {
  map.free();
}
  }
}
```

Closes #29744 from ankurdave/SPARK-32872.

Authored-by: Ankur Dave 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 72550c3be7120fcf2844d6914e883f1bec30d93f)
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++--
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 5ab52cc..4339e92 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -756,12 +756,21 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
 longArray.set(pos * 2 + 1, keyHashcode);
 isDefined = true;
 
-// We use two array entries per key, so the array size is twice the 
capacity.
-// We should compare the current capacity 

[spark] branch branch-3.0 updated: [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 990d49a  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold
990d49a is described below

commit 990d49a28d6b7d1bd011aa7f1666217bcc920a77
Author: Ankur Dave 
AuthorDate: Mon Sep 14 13:58:15 2020 -0700

[SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding 
growth threshold

### What changes were proposed in this pull request?

When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, 
`numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` 
is false. This correctly prevents the map from growing, but `canGrowArray` 
incorrectly remains true. Therefore the map keeps accepting new keys and 
exceeds its growth threshold. If we attempt to spill the map in this state, the 
UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By 
this point the task has ty [...]

This PR fixes the issue by setting `canGrowArray` to false in this case. 
This prevents the map from accepting new elements when it cannot grow to 
accommodate them.

### Why are the changes needed?

Without this change, hash aggregations will fail when the number of groups 
per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), 
and when the grouping aggregation is the only memory-consuming operator in its 
stage.

For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` 
fails when `tbl` contains 1 billion distinct values and when 
`spark.sql.shuffle.partitions=1`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Reproducing this issue requires building a very large BytesToBytesMap. 
Because this is infeasible to do in a unit test, this PR was tested manually by 
adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the 
test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.

```java
public abstract class AbstractBytesToBytesMapSuite {
  // ...
  Test
  public void respectGrowthThresholdAtMaxCapacity() {
TestMemoryManager memoryManager2 =
new TestMemoryManager(
new SparkConf()
.set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true)
.set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 
1024L)
.set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false)
.set(package$.MODULE$.SHUFFLE_COMPRESS(), false));
TaskMemoryManager taskMemoryManager2 = new 
TaskMemoryManager(memoryManager2, 0);
final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page 
marker
final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 
1024, pageSizeBytes);

try {
  // Insert keys into the map until it stops accepting new keys.
  for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
if (i % (1024 * 1024) == 0) System.out.println("Inserting element " 
+ i);
final long[] value = new long[]{i};
BytesToBytesMap.Location loc = map.lookup(value, 
Platform.LONG_ARRAY_OFFSET, 8);
Assert.assertFalse(loc.isDefined());
boolean success =
loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, 
Platform.LONG_ARRAY_OFFSET, 8);
if (!success) break;
  }

  // The map should grow to its max capacity.
  long capacity = map.getArray().size() / 2;
  Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);

  // The map should stop accepting new keys once it has reached its 
growth
  // threshold, which is half the max capacity.
  Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);

  map.free();
} finally {
  map.free();
}
  }
}
```

Closes #29744 from ankurdave/SPARK-32872.

Authored-by: Ankur Dave 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 72550c3be7120fcf2844d6914e883f1bec30d93f)
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++--
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 6e02888..4036856 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -738,12 +738,21 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
 

[spark] branch master updated (0696f046 -> 72550c3)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 0696f046 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 
and 2.4.7 in HiveExternalCatalogVersionsSuite
 add 72550c3  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++--
 1 file changed, 15 insertions(+), 6 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 828603d  [SPARK-32876][SQL] Change default fallback versions to 3.0.1 
and 2.4.7 in HiveExternalCatalogVersionsSuite
828603d is described below

commit 828603d3c0dbe2ad88a8f55c075b7163868cd6d9
Author: HyukjinKwon 
AuthorDate: Mon Sep 14 13:54:21 2020 -0700

[SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in 
HiveExternalCatalogVersionsSuite

### What changes were proposed in this pull request?

The Jenkins job fails to get the versions. This was fixed by adding 
temporary fallbacks at https://github.com/apache/spark/pull/28536.
This still doesn't work without the temporary fallbacks. See 
https://github.com/apache/spark/pull/29694

This PR adds new fallbacks since 2.3 is EOL and Spark 3.0.1 and 2.4.7 are 
released.

### Why are the changes needed?

To test correctly in Jenkins.

### Does this PR introduce _any_ user-facing change?

No, dev-only

### How was this patch tested?

Jenkins and GitHub Actions builds should test.

Closes #29748 from HyukjinKwon/SPARK-32876.

Authored-by: HyukjinKwon 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 0696f0467270969f40e9baa829533bdb55f4002a)
Signed-off-by: Dongjoon Hyun 
---
 .../org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
index aa96fa0..cbfdb7f 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
@@ -243,7 +243,7 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils {
 .filter(_ < org.apache.spark.SPARK_VERSION)
 } catch {
   // do not throw exception during object initialization.
-  case NonFatal(_) => Seq("2.3.4", "2.4.5") // A temporary fallback to use 
a specific version
+  case NonFatal(_) => Seq("3.0.1", "2.4.7") // A temporary fallback to use 
a specific version
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: Revert "[SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold"

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 94b9d6f  Revert "[SPARK-32872][CORE] Prevent BytesToBytesMap at 
MAX_CAPACITY from exceeding growth threshold"
94b9d6f is described below

commit 94b9d6f3c868689333ddc0ea20f1b80fb4e10a5e
Author: Dongjoon Hyun 
AuthorDate: Mon Sep 14 14:22:08 2020 -0700

Revert "[SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from 
exceeding growth threshold"

This reverts commit ad61af01a98487176e7bc15ef31d697c85d863f3.
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 ++---
 1 file changed, 6 insertions(+), 15 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 4339e92..5ab52cc 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -756,21 +756,12 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
 longArray.set(pos * 2 + 1, keyHashcode);
 isDefined = true;
 
-// If the map has reached its growth threshold, try to grow it.
-if (numKeys >= growthThreshold) {
-  // We use two array entries per key, so the array size is twice the 
capacity.
-  // We should compare the current capacity of the array, instead of 
its size.
-  if (longArray.size() / 2 < MAX_CAPACITY) {
-try {
-  growAndRehash();
-} catch (SparkOutOfMemoryError oom) {
-  canGrowArray = false;
-}
-  } else {
-// The map is already at MAX_CAPACITY and cannot grow. Instead, we 
prevent it from
-// accepting any more new elements to make sure we don't exceed 
the load factor. If we
-// need to spill later, this allows UnsafeKVExternalSorter to 
reuse the array for
-// sorting.
+// We use two array entries per key, so the array size is twice the 
capacity.
+// We should compare the current capacity of the array, instead of its 
size.
+if (numKeys >= growthThreshold && longArray.size() / 2 < MAX_CAPACITY) 
{
+  try {
+growAndRehash();
+  } catch (OutOfMemoryError oom) {
 canGrowArray = false;
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new ad61af0  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold
ad61af0 is described below

commit ad61af01a98487176e7bc15ef31d697c85d863f3
Author: Ankur Dave 
AuthorDate: Mon Sep 14 13:58:15 2020 -0700

[SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding 
growth threshold

When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, 
`numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` 
is false. This correctly prevents the map from growing, but `canGrowArray` 
incorrectly remains true. Therefore the map keeps accepting new keys and 
exceeds its growth threshold. If we attempt to spill the map in this state, the 
UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By 
this point the task has ty [...]

This PR fixes the issue by setting `canGrowArray` to false in this case. 
This prevents the map from accepting new elements when it cannot grow to 
accommodate them.

Without this change, hash aggregations will fail when the number of groups 
per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), 
and when the grouping aggregation is the only memory-consuming operator in its 
stage.

For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` 
fails when `tbl` contains 1 billion distinct values and when 
`spark.sql.shuffle.partitions=1`.

No.

Reproducing this issue requires building a very large BytesToBytesMap. 
Because this is infeasible to do in a unit test, this PR was tested manually by 
adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the 
test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.

```java
public abstract class AbstractBytesToBytesMapSuite {
  // ...
  Test
  public void respectGrowthThresholdAtMaxCapacity() {
TestMemoryManager memoryManager2 =
new TestMemoryManager(
new SparkConf()
.set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true)
.set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 
1024L)
.set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false)
.set(package$.MODULE$.SHUFFLE_COMPRESS(), false));
TaskMemoryManager taskMemoryManager2 = new 
TaskMemoryManager(memoryManager2, 0);
final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page 
marker
final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 
1024, pageSizeBytes);

try {
  // Insert keys into the map until it stops accepting new keys.
  for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
if (i % (1024 * 1024) == 0) System.out.println("Inserting element " 
+ i);
final long[] value = new long[]{i};
BytesToBytesMap.Location loc = map.lookup(value, 
Platform.LONG_ARRAY_OFFSET, 8);
Assert.assertFalse(loc.isDefined());
boolean success =
loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, 
Platform.LONG_ARRAY_OFFSET, 8);
if (!success) break;
  }

  // The map should grow to its max capacity.
  long capacity = map.getArray().size() / 2;
  Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);

  // The map should stop accepting new keys once it has reached its 
growth
  // threshold, which is half the max capacity.
  Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);

  map.free();
} finally {
  map.free();
}
  }
}
```

Closes #29744 from ankurdave/SPARK-32872.

Authored-by: Ankur Dave 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 72550c3be7120fcf2844d6914e883f1bec30d93f)
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++--
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 5ab52cc..4339e92 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -756,12 +756,21 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
 longArray.set(pos * 2 + 1, keyHashcode);
 isDefined = true;
 
-// We use two array entries per key, so the array size is twice the 
capacity.
-// We should compare the current capacity 

[spark] branch branch-3.0 updated: [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 990d49a  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold
990d49a is described below

commit 990d49a28d6b7d1bd011aa7f1666217bcc920a77
Author: Ankur Dave 
AuthorDate: Mon Sep 14 13:58:15 2020 -0700

[SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding 
growth threshold

### What changes were proposed in this pull request?

When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, 
`numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` 
is false. This correctly prevents the map from growing, but `canGrowArray` 
incorrectly remains true. Therefore the map keeps accepting new keys and 
exceeds its growth threshold. If we attempt to spill the map in this state, the 
UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By 
this point the task has ty [...]

This PR fixes the issue by setting `canGrowArray` to false in this case. 
This prevents the map from accepting new elements when it cannot grow to 
accommodate them.

### Why are the changes needed?

Without this change, hash aggregations will fail when the number of groups 
per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), 
and when the grouping aggregation is the only memory-consuming operator in its 
stage.

For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` 
fails when `tbl` contains 1 billion distinct values and when 
`spark.sql.shuffle.partitions=1`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Reproducing this issue requires building a very large BytesToBytesMap. 
Because this is infeasible to do in a unit test, this PR was tested manually by 
adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the 
test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.

```java
public abstract class AbstractBytesToBytesMapSuite {
  // ...
  Test
  public void respectGrowthThresholdAtMaxCapacity() {
TestMemoryManager memoryManager2 =
new TestMemoryManager(
new SparkConf()
.set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true)
.set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 
1024L)
.set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false)
.set(package$.MODULE$.SHUFFLE_COMPRESS(), false));
TaskMemoryManager taskMemoryManager2 = new 
TaskMemoryManager(memoryManager2, 0);
final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page 
marker
final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 
1024, pageSizeBytes);

try {
  // Insert keys into the map until it stops accepting new keys.
  for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
if (i % (1024 * 1024) == 0) System.out.println("Inserting element " 
+ i);
final long[] value = new long[]{i};
BytesToBytesMap.Location loc = map.lookup(value, 
Platform.LONG_ARRAY_OFFSET, 8);
Assert.assertFalse(loc.isDefined());
boolean success =
loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, 
Platform.LONG_ARRAY_OFFSET, 8);
if (!success) break;
  }

  // The map should grow to its max capacity.
  long capacity = map.getArray().size() / 2;
  Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);

  // The map should stop accepting new keys once it has reached its 
growth
  // threshold, which is half the max capacity.
  Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);

  map.free();
} finally {
  map.free();
}
  }
}
```

Closes #29744 from ankurdave/SPARK-32872.

Authored-by: Ankur Dave 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 72550c3be7120fcf2844d6914e883f1bec30d93f)
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++--
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 6e02888..4036856 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -738,12 +738,21 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
 

[spark] branch master updated (0696f046 -> 72550c3)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 0696f046 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 
and 2.4.7 in HiveExternalCatalogVersionsSuite
 add 72550c3  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++--
 1 file changed, 15 insertions(+), 6 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 828603d  [SPARK-32876][SQL] Change default fallback versions to 3.0.1 
and 2.4.7 in HiveExternalCatalogVersionsSuite
828603d is described below

commit 828603d3c0dbe2ad88a8f55c075b7163868cd6d9
Author: HyukjinKwon 
AuthorDate: Mon Sep 14 13:54:21 2020 -0700

[SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in 
HiveExternalCatalogVersionsSuite

### What changes were proposed in this pull request?

The Jenkins job fails to get the versions. This was fixed by adding 
temporary fallbacks at https://github.com/apache/spark/pull/28536.
This still doesn't work without the temporary fallbacks. See 
https://github.com/apache/spark/pull/29694

This PR adds new fallbacks since 2.3 is EOL and Spark 3.0.1 and 2.4.7 are 
released.

### Why are the changes needed?

To test correctly in Jenkins.

### Does this PR introduce _any_ user-facing change?

No, dev-only

### How was this patch tested?

Jenkins and GitHub Actions builds should test.

Closes #29748 from HyukjinKwon/SPARK-32876.

Authored-by: HyukjinKwon 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 0696f0467270969f40e9baa829533bdb55f4002a)
Signed-off-by: Dongjoon Hyun 
---
 .../org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
index aa96fa0..cbfdb7f 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
@@ -243,7 +243,7 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils {
 .filter(_ < org.apache.spark.SPARK_VERSION)
 } catch {
   // do not throw exception during object initialization.
-  case NonFatal(_) => Seq("2.3.4", "2.4.5") // A temporary fallback to use 
a specific version
+  case NonFatal(_) => Seq("3.0.1", "2.4.7") // A temporary fallback to use 
a specific version
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (7a17158 -> 0696f046)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 7a17158  [SPARK-32868][SQL] Add more order irrelevant aggregates to 
EliminateSorts
 add 0696f046 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 
and 2.4.7 in HiveExternalCatalogVersionsSuite

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new ad61af0  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold
ad61af0 is described below

commit ad61af01a98487176e7bc15ef31d697c85d863f3
Author: Ankur Dave 
AuthorDate: Mon Sep 14 13:58:15 2020 -0700

[SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding 
growth threshold

When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, 
`numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` 
is false. This correctly prevents the map from growing, but `canGrowArray` 
incorrectly remains true. Therefore the map keeps accepting new keys and 
exceeds its growth threshold. If we attempt to spill the map in this state, the 
UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By 
this point the task has ty [...]

This PR fixes the issue by setting `canGrowArray` to false in this case. 
This prevents the map from accepting new elements when it cannot grow to 
accommodate them.

Without this change, hash aggregations will fail when the number of groups 
per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), 
and when the grouping aggregation is the only memory-consuming operator in its 
stage.

For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` 
fails when `tbl` contains 1 billion distinct values and when 
`spark.sql.shuffle.partitions=1`.

No.

Reproducing this issue requires building a very large BytesToBytesMap. 
Because this is infeasible to do in a unit test, this PR was tested manually by 
adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the 
test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.

```java
public abstract class AbstractBytesToBytesMapSuite {
  // ...
  Test
  public void respectGrowthThresholdAtMaxCapacity() {
TestMemoryManager memoryManager2 =
new TestMemoryManager(
new SparkConf()
.set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true)
.set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 
1024L)
.set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false)
.set(package$.MODULE$.SHUFFLE_COMPRESS(), false));
TaskMemoryManager taskMemoryManager2 = new 
TaskMemoryManager(memoryManager2, 0);
final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page 
marker
final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 
1024, pageSizeBytes);

try {
  // Insert keys into the map until it stops accepting new keys.
  for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
if (i % (1024 * 1024) == 0) System.out.println("Inserting element " 
+ i);
final long[] value = new long[]{i};
BytesToBytesMap.Location loc = map.lookup(value, 
Platform.LONG_ARRAY_OFFSET, 8);
Assert.assertFalse(loc.isDefined());
boolean success =
loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, 
Platform.LONG_ARRAY_OFFSET, 8);
if (!success) break;
  }

  // The map should grow to its max capacity.
  long capacity = map.getArray().size() / 2;
  Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);

  // The map should stop accepting new keys once it has reached its 
growth
  // threshold, which is half the max capacity.
  Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);

  map.free();
} finally {
  map.free();
}
  }
}
```

Closes #29744 from ankurdave/SPARK-32872.

Authored-by: Ankur Dave 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 72550c3be7120fcf2844d6914e883f1bec30d93f)
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++--
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 5ab52cc..4339e92 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -756,12 +756,21 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
 longArray.set(pos * 2 + 1, keyHashcode);
 isDefined = true;
 
-// We use two array entries per key, so the array size is twice the 
capacity.
-// We should compare the current capacity 

[spark] branch branch-3.0 updated: [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 990d49a  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold
990d49a is described below

commit 990d49a28d6b7d1bd011aa7f1666217bcc920a77
Author: Ankur Dave 
AuthorDate: Mon Sep 14 13:58:15 2020 -0700

[SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding 
growth threshold

### What changes were proposed in this pull request?

When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, 
`numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` 
is false. This correctly prevents the map from growing, but `canGrowArray` 
incorrectly remains true. Therefore the map keeps accepting new keys and 
exceeds its growth threshold. If we attempt to spill the map in this state, the 
UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By 
this point the task has ty [...]

This PR fixes the issue by setting `canGrowArray` to false in this case. 
This prevents the map from accepting new elements when it cannot grow to 
accommodate them.

### Why are the changes needed?

Without this change, hash aggregations will fail when the number of groups 
per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), 
and when the grouping aggregation is the only memory-consuming operator in its 
stage.

For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` 
fails when `tbl` contains 1 billion distinct values and when 
`spark.sql.shuffle.partitions=1`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Reproducing this issue requires building a very large BytesToBytesMap. 
Because this is infeasible to do in a unit test, this PR was tested manually by 
adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the 
test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.

```java
public abstract class AbstractBytesToBytesMapSuite {
  // ...
  Test
  public void respectGrowthThresholdAtMaxCapacity() {
TestMemoryManager memoryManager2 =
new TestMemoryManager(
new SparkConf()
.set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true)
.set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 
1024L)
.set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false)
.set(package$.MODULE$.SHUFFLE_COMPRESS(), false));
TaskMemoryManager taskMemoryManager2 = new 
TaskMemoryManager(memoryManager2, 0);
final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page 
marker
final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 
1024, pageSizeBytes);

try {
  // Insert keys into the map until it stops accepting new keys.
  for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
if (i % (1024 * 1024) == 0) System.out.println("Inserting element " 
+ i);
final long[] value = new long[]{i};
BytesToBytesMap.Location loc = map.lookup(value, 
Platform.LONG_ARRAY_OFFSET, 8);
Assert.assertFalse(loc.isDefined());
boolean success =
loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, 
Platform.LONG_ARRAY_OFFSET, 8);
if (!success) break;
  }

  // The map should grow to its max capacity.
  long capacity = map.getArray().size() / 2;
  Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);

  // The map should stop accepting new keys once it has reached its 
growth
  // threshold, which is half the max capacity.
  Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);

  map.free();
} finally {
  map.free();
}
  }
}
```

Closes #29744 from ankurdave/SPARK-32872.

Authored-by: Ankur Dave 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 72550c3be7120fcf2844d6914e883f1bec30d93f)
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++--
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 6e02888..4036856 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -738,12 +738,21 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
 

[spark] branch master updated (0696f046 -> 72550c3)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 0696f046 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 
and 2.4.7 in HiveExternalCatalogVersionsSuite
 add 72550c3  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++--
 1 file changed, 15 insertions(+), 6 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 828603d  [SPARK-32876][SQL] Change default fallback versions to 3.0.1 
and 2.4.7 in HiveExternalCatalogVersionsSuite
828603d is described below

commit 828603d3c0dbe2ad88a8f55c075b7163868cd6d9
Author: HyukjinKwon 
AuthorDate: Mon Sep 14 13:54:21 2020 -0700

[SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in 
HiveExternalCatalogVersionsSuite

### What changes were proposed in this pull request?

The Jenkins job fails to get the versions. This was fixed by adding 
temporary fallbacks at https://github.com/apache/spark/pull/28536.
This still doesn't work without the temporary fallbacks. See 
https://github.com/apache/spark/pull/29694

This PR adds new fallbacks since 2.3 is EOL and Spark 3.0.1 and 2.4.7 are 
released.

### Why are the changes needed?

To test correctly in Jenkins.

### Does this PR introduce _any_ user-facing change?

No, dev-only

### How was this patch tested?

Jenkins and GitHub Actions builds should test.

Closes #29748 from HyukjinKwon/SPARK-32876.

Authored-by: HyukjinKwon 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 0696f0467270969f40e9baa829533bdb55f4002a)
Signed-off-by: Dongjoon Hyun 
---
 .../org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
index aa96fa0..cbfdb7f 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
@@ -243,7 +243,7 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils {
 .filter(_ < org.apache.spark.SPARK_VERSION)
 } catch {
   // do not throw exception during object initialization.
-  case NonFatal(_) => Seq("2.3.4", "2.4.5") // A temporary fallback to use 
a specific version
+  case NonFatal(_) => Seq("3.0.1", "2.4.7") // A temporary fallback to use 
a specific version
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (7a17158 -> 0696f046)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 7a17158  [SPARK-32868][SQL] Add more order irrelevant aggregates to 
EliminateSorts
 add 0696f046 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 
and 2.4.7 in HiveExternalCatalogVersionsSuite

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new ad61af0  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold
ad61af0 is described below

commit ad61af01a98487176e7bc15ef31d697c85d863f3
Author: Ankur Dave 
AuthorDate: Mon Sep 14 13:58:15 2020 -0700

[SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding 
growth threshold

When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, 
`numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` 
is false. This correctly prevents the map from growing, but `canGrowArray` 
incorrectly remains true. Therefore the map keeps accepting new keys and 
exceeds its growth threshold. If we attempt to spill the map in this state, the 
UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By 
this point the task has ty [...]

This PR fixes the issue by setting `canGrowArray` to false in this case. 
This prevents the map from accepting new elements when it cannot grow to 
accommodate them.

Without this change, hash aggregations will fail when the number of groups 
per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), 
and when the grouping aggregation is the only memory-consuming operator in its 
stage.

For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` 
fails when `tbl` contains 1 billion distinct values and when 
`spark.sql.shuffle.partitions=1`.

No.

Reproducing this issue requires building a very large BytesToBytesMap. 
Because this is infeasible to do in a unit test, this PR was tested manually by 
adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the 
test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.

```java
public abstract class AbstractBytesToBytesMapSuite {
  // ...
  Test
  public void respectGrowthThresholdAtMaxCapacity() {
TestMemoryManager memoryManager2 =
new TestMemoryManager(
new SparkConf()
.set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true)
.set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 
1024L)
.set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false)
.set(package$.MODULE$.SHUFFLE_COMPRESS(), false));
TaskMemoryManager taskMemoryManager2 = new 
TaskMemoryManager(memoryManager2, 0);
final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page 
marker
final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 
1024, pageSizeBytes);

try {
  // Insert keys into the map until it stops accepting new keys.
  for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
if (i % (1024 * 1024) == 0) System.out.println("Inserting element " 
+ i);
final long[] value = new long[]{i};
BytesToBytesMap.Location loc = map.lookup(value, 
Platform.LONG_ARRAY_OFFSET, 8);
Assert.assertFalse(loc.isDefined());
boolean success =
loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, 
Platform.LONG_ARRAY_OFFSET, 8);
if (!success) break;
  }

  // The map should grow to its max capacity.
  long capacity = map.getArray().size() / 2;
  Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);

  // The map should stop accepting new keys once it has reached its 
growth
  // threshold, which is half the max capacity.
  Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);

  map.free();
} finally {
  map.free();
}
  }
}
```

Closes #29744 from ankurdave/SPARK-32872.

Authored-by: Ankur Dave 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 72550c3be7120fcf2844d6914e883f1bec30d93f)
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++--
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 5ab52cc..4339e92 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -756,12 +756,21 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
 longArray.set(pos * 2 + 1, keyHashcode);
 isDefined = true;
 
-// We use two array entries per key, so the array size is twice the 
capacity.
-// We should compare the current capacity 

[spark] branch branch-3.0 updated: [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 990d49a  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold
990d49a is described below

commit 990d49a28d6b7d1bd011aa7f1666217bcc920a77
Author: Ankur Dave 
AuthorDate: Mon Sep 14 13:58:15 2020 -0700

[SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding 
growth threshold

### What changes were proposed in this pull request?

When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, 
`numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` 
is false. This correctly prevents the map from growing, but `canGrowArray` 
incorrectly remains true. Therefore the map keeps accepting new keys and 
exceeds its growth threshold. If we attempt to spill the map in this state, the 
UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By 
this point the task has ty [...]

This PR fixes the issue by setting `canGrowArray` to false in this case. 
This prevents the map from accepting new elements when it cannot grow to 
accommodate them.

### Why are the changes needed?

Without this change, hash aggregations will fail when the number of groups 
per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), 
and when the grouping aggregation is the only memory-consuming operator in its 
stage.

For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` 
fails when `tbl` contains 1 billion distinct values and when 
`spark.sql.shuffle.partitions=1`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Reproducing this issue requires building a very large BytesToBytesMap. 
Because this is infeasible to do in a unit test, this PR was tested manually by 
adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the 
test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.

```java
public abstract class AbstractBytesToBytesMapSuite {
  // ...
  Test
  public void respectGrowthThresholdAtMaxCapacity() {
TestMemoryManager memoryManager2 =
new TestMemoryManager(
new SparkConf()
.set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true)
.set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 
1024L)
.set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false)
.set(package$.MODULE$.SHUFFLE_COMPRESS(), false));
TaskMemoryManager taskMemoryManager2 = new 
TaskMemoryManager(memoryManager2, 0);
final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page 
marker
final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 
1024, pageSizeBytes);

try {
  // Insert keys into the map until it stops accepting new keys.
  for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
if (i % (1024 * 1024) == 0) System.out.println("Inserting element " 
+ i);
final long[] value = new long[]{i};
BytesToBytesMap.Location loc = map.lookup(value, 
Platform.LONG_ARRAY_OFFSET, 8);
Assert.assertFalse(loc.isDefined());
boolean success =
loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, 
Platform.LONG_ARRAY_OFFSET, 8);
if (!success) break;
  }

  // The map should grow to its max capacity.
  long capacity = map.getArray().size() / 2;
  Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);

  // The map should stop accepting new keys once it has reached its 
growth
  // threshold, which is half the max capacity.
  Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);

  map.free();
} finally {
  map.free();
}
  }
}
```

Closes #29744 from ankurdave/SPARK-32872.

Authored-by: Ankur Dave 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 72550c3be7120fcf2844d6914e883f1bec30d93f)
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++--
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 6e02888..4036856 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -738,12 +738,21 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
 

[spark] branch master updated (0696f046 -> 72550c3)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 0696f046 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 
and 2.4.7 in HiveExternalCatalogVersionsSuite
 add 72550c3  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++--
 1 file changed, 15 insertions(+), 6 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 828603d  [SPARK-32876][SQL] Change default fallback versions to 3.0.1 
and 2.4.7 in HiveExternalCatalogVersionsSuite
828603d is described below

commit 828603d3c0dbe2ad88a8f55c075b7163868cd6d9
Author: HyukjinKwon 
AuthorDate: Mon Sep 14 13:54:21 2020 -0700

[SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in 
HiveExternalCatalogVersionsSuite

### What changes were proposed in this pull request?

The Jenkins job fails to get the versions. This was fixed by adding 
temporary fallbacks at https://github.com/apache/spark/pull/28536.
This still doesn't work without the temporary fallbacks. See 
https://github.com/apache/spark/pull/29694

This PR adds new fallbacks since 2.3 is EOL and Spark 3.0.1 and 2.4.7 are 
released.

### Why are the changes needed?

To test correctly in Jenkins.

### Does this PR introduce _any_ user-facing change?

No, dev-only

### How was this patch tested?

Jenkins and GitHub Actions builds should test.

Closes #29748 from HyukjinKwon/SPARK-32876.

Authored-by: HyukjinKwon 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 0696f0467270969f40e9baa829533bdb55f4002a)
Signed-off-by: Dongjoon Hyun 
---
 .../org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
index aa96fa0..cbfdb7f 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
@@ -243,7 +243,7 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils {
 .filter(_ < org.apache.spark.SPARK_VERSION)
 } catch {
   // do not throw exception during object initialization.
-  case NonFatal(_) => Seq("2.3.4", "2.4.5") // A temporary fallback to use 
a specific version
+  case NonFatal(_) => Seq("3.0.1", "2.4.7") // A temporary fallback to use 
a specific version
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (7a17158 -> 0696f046)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 7a17158  [SPARK-32868][SQL] Add more order irrelevant aggregates to 
EliminateSorts
 add 0696f046 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 
and 2.4.7 in HiveExternalCatalogVersionsSuite

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 990d49a  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold
990d49a is described below

commit 990d49a28d6b7d1bd011aa7f1666217bcc920a77
Author: Ankur Dave 
AuthorDate: Mon Sep 14 13:58:15 2020 -0700

[SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding 
growth threshold

### What changes were proposed in this pull request?

When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, 
`numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` 
is false. This correctly prevents the map from growing, but `canGrowArray` 
incorrectly remains true. Therefore the map keeps accepting new keys and 
exceeds its growth threshold. If we attempt to spill the map in this state, the 
UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By 
this point the task has ty [...]

This PR fixes the issue by setting `canGrowArray` to false in this case. 
This prevents the map from accepting new elements when it cannot grow to 
accommodate them.

### Why are the changes needed?

Without this change, hash aggregations will fail when the number of groups 
per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), 
and when the grouping aggregation is the only memory-consuming operator in its 
stage.

For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` 
fails when `tbl` contains 1 billion distinct values and when 
`spark.sql.shuffle.partitions=1`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Reproducing this issue requires building a very large BytesToBytesMap. 
Because this is infeasible to do in a unit test, this PR was tested manually by 
adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the 
test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.

```java
public abstract class AbstractBytesToBytesMapSuite {
  // ...
  Test
  public void respectGrowthThresholdAtMaxCapacity() {
TestMemoryManager memoryManager2 =
new TestMemoryManager(
new SparkConf()
.set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true)
.set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 
1024L)
.set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false)
.set(package$.MODULE$.SHUFFLE_COMPRESS(), false));
TaskMemoryManager taskMemoryManager2 = new 
TaskMemoryManager(memoryManager2, 0);
final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page 
marker
final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 
1024, pageSizeBytes);

try {
  // Insert keys into the map until it stops accepting new keys.
  for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
if (i % (1024 * 1024) == 0) System.out.println("Inserting element " 
+ i);
final long[] value = new long[]{i};
BytesToBytesMap.Location loc = map.lookup(value, 
Platform.LONG_ARRAY_OFFSET, 8);
Assert.assertFalse(loc.isDefined());
boolean success =
loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, 
Platform.LONG_ARRAY_OFFSET, 8);
if (!success) break;
  }

  // The map should grow to its max capacity.
  long capacity = map.getArray().size() / 2;
  Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);

  // The map should stop accepting new keys once it has reached its 
growth
  // threshold, which is half the max capacity.
  Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);

  map.free();
} finally {
  map.free();
}
  }
}
```

Closes #29744 from ankurdave/SPARK-32872.

Authored-by: Ankur Dave 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 72550c3be7120fcf2844d6914e883f1bec30d93f)
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++--
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 6e02888..4036856 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -738,12 +738,21 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
 

[spark] branch master updated (0696f046 -> 72550c3)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 0696f046 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 
and 2.4.7 in HiveExternalCatalogVersionsSuite
 add 72550c3  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY 
from exceeding growth threshold

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++--
 1 file changed, 15 insertions(+), 6 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 828603d  [SPARK-32876][SQL] Change default fallback versions to 3.0.1 
and 2.4.7 in HiveExternalCatalogVersionsSuite
828603d is described below

commit 828603d3c0dbe2ad88a8f55c075b7163868cd6d9
Author: HyukjinKwon 
AuthorDate: Mon Sep 14 13:54:21 2020 -0700

[SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in 
HiveExternalCatalogVersionsSuite

### What changes were proposed in this pull request?

The Jenkins job fails to get the versions. This was fixed by adding 
temporary fallbacks at https://github.com/apache/spark/pull/28536.
This still doesn't work without the temporary fallbacks. See 
https://github.com/apache/spark/pull/29694

This PR adds new fallbacks since 2.3 is EOL and Spark 3.0.1 and 2.4.7 are 
released.

### Why are the changes needed?

To test correctly in Jenkins.

### Does this PR introduce _any_ user-facing change?

No, dev-only

### How was this patch tested?

Jenkins and GitHub Actions builds should test.

Closes #29748 from HyukjinKwon/SPARK-32876.

Authored-by: HyukjinKwon 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 0696f0467270969f40e9baa829533bdb55f4002a)
Signed-off-by: Dongjoon Hyun 
---
 .../org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
index aa96fa0..cbfdb7f 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
@@ -243,7 +243,7 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils {
 .filter(_ < org.apache.spark.SPARK_VERSION)
 } catch {
   // do not throw exception during object initialization.
-  case NonFatal(_) => Seq("2.3.4", "2.4.5") // A temporary fallback to use 
a specific version
+  case NonFatal(_) => Seq("3.0.1", "2.4.7") // A temporary fallback to use 
a specific version
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (7a17158 -> 0696f046)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 7a17158  [SPARK-32868][SQL] Add more order irrelevant aggregates to 
EliminateSorts
 add 0696f046 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 
and 2.4.7 in HiveExternalCatalogVersionsSuite

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (7a17158 -> 0696f046)

2020-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 7a17158  [SPARK-32868][SQL] Add more order irrelevant aggregates to 
EliminateSorts
 add 0696f046 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 
and 2.4.7 in HiveExternalCatalogVersionsSuite

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (5e82548 -> 7a17158)

2020-09-14 Thread yamamuro
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 5e82548  [SPARK-32844][SQL] Make `DataFrameReader.table` take the 
specified options for datasource v1
 add 7a17158  [SPARK-32868][SQL] Add more order irrelevant aggregates to 
EliminateSorts

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/sql/catalyst/dsl/package.scala|  6 +
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  2 +-
 .../catalyst/optimizer/EliminateSortsSuite.scala   | 26 --
 3 files changed, 26 insertions(+), 8 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (5e82548 -> 7a17158)

2020-09-14 Thread yamamuro
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 5e82548  [SPARK-32844][SQL] Make `DataFrameReader.table` take the 
specified options for datasource v1
 add 7a17158  [SPARK-32868][SQL] Add more order irrelevant aggregates to 
EliminateSorts

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/sql/catalyst/dsl/package.scala|  6 +
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  2 +-
 .../catalyst/optimizer/EliminateSortsSuite.scala   | 26 --
 3 files changed, 26 insertions(+), 8 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (5e82548 -> 7a17158)

2020-09-14 Thread yamamuro
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 5e82548  [SPARK-32844][SQL] Make `DataFrameReader.table` take the 
specified options for datasource v1
 add 7a17158  [SPARK-32868][SQL] Add more order irrelevant aggregates to 
EliminateSorts

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/sql/catalyst/dsl/package.scala|  6 +
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  2 +-
 .../catalyst/optimizer/EliminateSortsSuite.scala   | 26 --
 3 files changed, 26 insertions(+), 8 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (5e82548 -> 7a17158)

2020-09-14 Thread yamamuro
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 5e82548  [SPARK-32844][SQL] Make `DataFrameReader.table` take the 
specified options for datasource v1
 add 7a17158  [SPARK-32868][SQL] Add more order irrelevant aggregates to 
EliminateSorts

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/sql/catalyst/dsl/package.scala|  6 +
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  2 +-
 .../catalyst/optimizer/EliminateSortsSuite.scala   | 26 --
 3 files changed, 26 insertions(+), 8 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (5e82548 -> 7a17158)

2020-09-14 Thread yamamuro
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 5e82548  [SPARK-32844][SQL] Make `DataFrameReader.table` take the 
specified options for datasource v1
 add 7a17158  [SPARK-32868][SQL] Add more order irrelevant aggregates to 
EliminateSorts

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/sql/catalyst/dsl/package.scala|  6 +
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  2 +-
 .../catalyst/optimizer/EliminateSortsSuite.scala   | 26 --
 3 files changed, 26 insertions(+), 8 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (978f531 -> 5e82548)

2020-09-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 978f531  [SPARK-32854][SS] Minor code and doc improvement for 
stream-stream join
 add 5e82548  [SPARK-32844][SQL] Make `DataFrameReader.table` take the 
specified options for datasource v1

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/catalyst/analysis/Analyzer.scala |  2 +-
 .../sql/catalyst/catalog/SessionCatalog.scala  |  7 +++--
 .../spark/sql/catalyst/catalog/interface.scala |  5 +++-
 .../org/apache/spark/sql/internal/SQLConf.scala| 12 
 .../execution/datasources/DataSourceStrategy.scala | 21 +++--
 .../execution/datasources/DataSourceUtils.scala| 34 ++
 .../sql/test/DataFrameReaderWriterSuite.scala  | 22 +-
 7 files changed, 89 insertions(+), 14 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (978f531 -> 5e82548)

2020-09-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 978f531  [SPARK-32854][SS] Minor code and doc improvement for 
stream-stream join
 add 5e82548  [SPARK-32844][SQL] Make `DataFrameReader.table` take the 
specified options for datasource v1

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/catalyst/analysis/Analyzer.scala |  2 +-
 .../sql/catalyst/catalog/SessionCatalog.scala  |  7 +++--
 .../spark/sql/catalyst/catalog/interface.scala |  5 +++-
 .../org/apache/spark/sql/internal/SQLConf.scala| 12 
 .../execution/datasources/DataSourceStrategy.scala | 21 +++--
 .../execution/datasources/DataSourceUtils.scala| 34 ++
 .../sql/test/DataFrameReaderWriterSuite.scala  | 22 +-
 7 files changed, 89 insertions(+), 14 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (978f531 -> 5e82548)

2020-09-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 978f531  [SPARK-32854][SS] Minor code and doc improvement for 
stream-stream join
 add 5e82548  [SPARK-32844][SQL] Make `DataFrameReader.table` take the 
specified options for datasource v1

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/catalyst/analysis/Analyzer.scala |  2 +-
 .../sql/catalyst/catalog/SessionCatalog.scala  |  7 +++--
 .../spark/sql/catalyst/catalog/interface.scala |  5 +++-
 .../org/apache/spark/sql/internal/SQLConf.scala| 12 
 .../execution/datasources/DataSourceStrategy.scala | 21 +++--
 .../execution/datasources/DataSourceUtils.scala| 34 ++
 .../sql/test/DataFrameReaderWriterSuite.scala  | 22 +-
 7 files changed, 89 insertions(+), 14 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (978f531 -> 5e82548)

2020-09-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 978f531  [SPARK-32854][SS] Minor code and doc improvement for 
stream-stream join
 add 5e82548  [SPARK-32844][SQL] Make `DataFrameReader.table` take the 
specified options for datasource v1

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/catalyst/analysis/Analyzer.scala |  2 +-
 .../sql/catalyst/catalog/SessionCatalog.scala  |  7 +++--
 .../spark/sql/catalyst/catalog/interface.scala |  5 +++-
 .../org/apache/spark/sql/internal/SQLConf.scala| 12 
 .../execution/datasources/DataSourceStrategy.scala | 21 +++--
 .../execution/datasources/DataSourceUtils.scala| 34 ++
 .../sql/test/DataFrameReaderWriterSuite.scala  | 22 +-
 7 files changed, 89 insertions(+), 14 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (978f531 -> 5e82548)

2020-09-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 978f531  [SPARK-32854][SS] Minor code and doc improvement for 
stream-stream join
 add 5e82548  [SPARK-32844][SQL] Make `DataFrameReader.table` take the 
specified options for datasource v1

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/catalyst/analysis/Analyzer.scala |  2 +-
 .../sql/catalyst/catalog/SessionCatalog.scala  |  7 +++--
 .../spark/sql/catalyst/catalog/interface.scala |  5 +++-
 .../org/apache/spark/sql/internal/SQLConf.scala| 12 
 .../execution/datasources/DataSourceStrategy.scala | 21 +++--
 .../execution/datasources/DataSourceUtils.scala| 34 ++
 .../sql/test/DataFrameReaderWriterSuite.scala  | 22 +-
 7 files changed, 89 insertions(+), 14 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (b121f0d -> 978f531)

2020-09-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from b121f0d  [SPARK-32873][BUILD] Fix code which causes error when build 
with sbt and Scala 2.13
 add 978f531  [SPARK-32854][SS] Minor code and doc improvement for 
stream-stream join

No new revisions were added by this update.

Summary of changes:
 .../streaming/StreamingSymmetricHashJoinExec.scala | 56 --
 1 file changed, 30 insertions(+), 26 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (b121f0d -> 978f531)

2020-09-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from b121f0d  [SPARK-32873][BUILD] Fix code which causes error when build 
with sbt and Scala 2.13
 add 978f531  [SPARK-32854][SS] Minor code and doc improvement for 
stream-stream join

No new revisions were added by this update.

Summary of changes:
 .../streaming/StreamingSymmetricHashJoinExec.scala | 56 --
 1 file changed, 30 insertions(+), 26 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (b121f0d -> 978f531)

2020-09-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from b121f0d  [SPARK-32873][BUILD] Fix code which causes error when build 
with sbt and Scala 2.13
 add 978f531  [SPARK-32854][SS] Minor code and doc improvement for 
stream-stream join

No new revisions were added by this update.

Summary of changes:
 .../streaming/StreamingSymmetricHashJoinExec.scala | 56 --
 1 file changed, 30 insertions(+), 26 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (b121f0d -> 978f531)

2020-09-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from b121f0d  [SPARK-32873][BUILD] Fix code which causes error when build 
with sbt and Scala 2.13
 add 978f531  [SPARK-32854][SS] Minor code and doc improvement for 
stream-stream join

No new revisions were added by this update.

Summary of changes:
 .../streaming/StreamingSymmetricHashJoinExec.scala | 56 --
 1 file changed, 30 insertions(+), 26 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (b121f0d -> 978f531)

2020-09-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from b121f0d  [SPARK-32873][BUILD] Fix code which causes error when build 
with sbt and Scala 2.13
 add 978f531  [SPARK-32854][SS] Minor code and doc improvement for 
stream-stream join

No new revisions were added by this update.

Summary of changes:
 .../streaming/StreamingSymmetricHashJoinExec.scala | 56 --
 1 file changed, 30 insertions(+), 26 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-32708] Query optimization fails to reuse exchange with DataSourceV2

2020-09-14 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new d33052b  [SPARK-32708] Query optimization fails to reuse exchange with 
DataSourceV2
d33052b is described below

commit d33052b8456818a71724638248cfb91f1aacf1d4
Author: mingjial 
AuthorDate: Mon Sep 14 15:51:13 2020 +0800

[SPARK-32708] Query optimization fails to reuse exchange with DataSourceV2

### What changes were proposed in this pull request?

Override doCanonicalize function of class DataSourceV2ScanExec

### Why are the changes needed?

Query plan of DataSourceV2 fails to reuse any exchange. This change can 
make DataSourceV2's plan more optimized and reuse exchange as DataSourceV1 and 
parquet do.

Direct reason: equals function of DataSourceV2ScanExec returns 'false' as 
comparing the same V2 scans(same table, outputs and pushedfilters)

Actual cause : With query plan's default doCanonicalize function, 
pushedfilters of DataSourceV2ScanExec are not canonicalized correctly. 
Essentially expressionId of predicate columns are not normalized.

[Spark 32708](https://issues.apache.org/jira/browse/SPARK-32708#) was not 
caught by my 
[tests](https://github.com/apache/spark/blob/5b1b9b39eb612cbf9ec67efd4e364adafcff66c4/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala#L392)
 previously added for [SPARK-32609] because the above issue happens only when 
the same filtered column are of different expression id (eg :  join table t1 
with t1)

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

unit test added

Closes #29564 from mingjialiu/branch-2.4.

Authored-by: mingjial 
Signed-off-by: Gengliang Wang 
---
 .../datasources/v2/DataSourceV2ScanExec.scala  | 12 +++
 .../spark/sql/sources/v2/DataSourceV2Suite.scala   | 23 ++
 2 files changed, 35 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index 9b70eec..2fe563a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.physical
 import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
 import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, 
WholeStageCodegenExec}
@@ -52,6 +53,17 @@ case class DataSourceV2ScanExec(
 case _ => false
   }
 
+  override def doCanonicalize(): DataSourceV2ScanExec = {
+DataSourceV2ScanExec(
+  output.map(QueryPlan.normalizeExprId(_, output)),
+  source,
+  options,
+  QueryPlan.normalizePredicates(
+pushedFilters,
+AttributeSeq(pushedFilters.flatMap(_.references).distinct)),
+  reader)
+  }
+
   override def hashCode(): Int = {
 Seq(output, source, options).hashCode()
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index ef0a8bd..92e0ac9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -393,6 +393,29 @@ class DataSourceV2Suite extends QueryTest with 
SharedSQLContext {
   }
 }
   }
+
+  test("SPARK-32708: same columns with different ExprIds should be equal after 
canonicalization ") {
+def getV2ScanExec(query: DataFrame): DataSourceV2ScanExec = {
+  query.queryExecution.executedPlan.collect {
+case d: DataSourceV2ScanExec => d
+  }.head
+}
+
+val df1 = spark.read.format(classOf[AdvancedDataSourceV2].getName).load()
+val q1 = df1.select('i).filter('i > 6)
+val df2 = spark.read.format(classOf[AdvancedDataSourceV2].getName).load()
+val q2 = df2.select('i).filter('i > 6)
+val scan1 = getV2ScanExec(q1)
+val scan2 = getV2ScanExec(q2)
+assert(scan1.sameResult(scan2))
+assert(scan1.doCanonicalize().equals(scan2.doCanonicalize()))
+
+val q3 = df2.select('i).filter('i > 5)
+val scan3 = getV2ScanExec(q3)
+assert(!scan1.sameResult(scan3))
+assert(!scan1.doCanonicalize().equals(scan3.doCanonicalize()))
+  }
+
 }
 
 class 

[spark] branch branch-2.4 updated (4072665 -> d33052b)

2020-09-14 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a change to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 4072665  [SPARK-32865][DOC] python section in quickstart page doesn't 
display SPARK_VERSION correctly
 add d33052b  [SPARK-32708] Query optimization fails to reuse exchange with 
DataSourceV2

No new revisions were added by this update.

Summary of changes:
 .../datasources/v2/DataSourceV2ScanExec.scala  | 12 +++
 .../spark/sql/sources/v2/DataSourceV2Suite.scala   | 23 ++
 2 files changed, 35 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated (4072665 -> d33052b)

2020-09-14 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a change to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 4072665  [SPARK-32865][DOC] python section in quickstart page doesn't 
display SPARK_VERSION correctly
 add d33052b  [SPARK-32708] Query optimization fails to reuse exchange with 
DataSourceV2

No new revisions were added by this update.

Summary of changes:
 .../datasources/v2/DataSourceV2ScanExec.scala  | 12 +++
 .../spark/sql/sources/v2/DataSourceV2Suite.scala   | 23 ++
 2 files changed, 35 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated (4072665 -> d33052b)

2020-09-14 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a change to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 4072665  [SPARK-32865][DOC] python section in quickstart page doesn't 
display SPARK_VERSION correctly
 add d33052b  [SPARK-32708] Query optimization fails to reuse exchange with 
DataSourceV2

No new revisions were added by this update.

Summary of changes:
 .../datasources/v2/DataSourceV2ScanExec.scala  | 12 +++
 .../spark/sql/sources/v2/DataSourceV2Suite.scala   | 23 ++
 2 files changed, 35 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated (4072665 -> d33052b)

2020-09-14 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a change to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 4072665  [SPARK-32865][DOC] python section in quickstart page doesn't 
display SPARK_VERSION correctly
 add d33052b  [SPARK-32708] Query optimization fails to reuse exchange with 
DataSourceV2

No new revisions were added by this update.

Summary of changes:
 .../datasources/v2/DataSourceV2ScanExec.scala  | 12 +++
 .../spark/sql/sources/v2/DataSourceV2Suite.scala   | 23 ++
 2 files changed, 35 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (742fcff -> b121f0d)

2020-09-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 742fcff  [SPARK-32839][WINDOWS] Make Spark scripts working with the 
spaces in paths on Windows
 add b121f0d  [SPARK-32873][BUILD] Fix code which causes error when build 
with sbt and Scala 2.13

No new revisions were added by this update.

Summary of changes:
 .../main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala   | 4 ++--
 mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala | 2 +-
 mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala   | 6 +++---
 mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala| 2 +-
 .../scala/org/apache/spark/sql/execution/command/commands.scala | 4 ++--
 .../apache/spark/sql/execution/datasources/v2/V2CommandExec.scala   | 2 +-
 .../spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala  | 2 +-
 .../execution/streaming/state/SymmetricHashJoinStateManager.scala   | 4 ++--
 .../apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala| 2 +-
 .../org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala  | 2 +-
 10 files changed, 15 insertions(+), 15 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (742fcff -> b121f0d)

2020-09-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 742fcff  [SPARK-32839][WINDOWS] Make Spark scripts working with the 
spaces in paths on Windows
 add b121f0d  [SPARK-32873][BUILD] Fix code which causes error when build 
with sbt and Scala 2.13

No new revisions were added by this update.

Summary of changes:
 .../main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala   | 4 ++--
 mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala | 2 +-
 mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala   | 6 +++---
 mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala| 2 +-
 .../scala/org/apache/spark/sql/execution/command/commands.scala | 4 ++--
 .../apache/spark/sql/execution/datasources/v2/V2CommandExec.scala   | 2 +-
 .../spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala  | 2 +-
 .../execution/streaming/state/SymmetricHashJoinStateManager.scala   | 4 ++--
 .../apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala| 2 +-
 .../org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala  | 2 +-
 10 files changed, 15 insertions(+), 15 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (742fcff -> b121f0d)

2020-09-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 742fcff  [SPARK-32839][WINDOWS] Make Spark scripts working with the 
spaces in paths on Windows
 add b121f0d  [SPARK-32873][BUILD] Fix code which causes error when build 
with sbt and Scala 2.13

No new revisions were added by this update.

Summary of changes:
 .../main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala   | 4 ++--
 mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala | 2 +-
 mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala   | 6 +++---
 mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala| 2 +-
 .../scala/org/apache/spark/sql/execution/command/commands.scala | 4 ++--
 .../apache/spark/sql/execution/datasources/v2/V2CommandExec.scala   | 2 +-
 .../spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala  | 2 +-
 .../execution/streaming/state/SymmetricHashJoinStateManager.scala   | 4 ++--
 .../apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala| 2 +-
 .../org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala  | 2 +-
 10 files changed, 15 insertions(+), 15 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (742fcff -> b121f0d)

2020-09-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 742fcff  [SPARK-32839][WINDOWS] Make Spark scripts working with the 
spaces in paths on Windows
 add b121f0d  [SPARK-32873][BUILD] Fix code which causes error when build 
with sbt and Scala 2.13

No new revisions were added by this update.

Summary of changes:
 .../main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala   | 4 ++--
 mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala | 2 +-
 mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala   | 6 +++---
 mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala| 2 +-
 .../scala/org/apache/spark/sql/execution/command/commands.scala | 4 ++--
 .../apache/spark/sql/execution/datasources/v2/V2CommandExec.scala   | 2 +-
 .../spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala  | 2 +-
 .../execution/streaming/state/SymmetricHashJoinStateManager.scala   | 4 ++--
 .../apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala| 2 +-
 .../org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala  | 2 +-
 10 files changed, 15 insertions(+), 15 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (742fcff -> b121f0d)

2020-09-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 742fcff  [SPARK-32839][WINDOWS] Make Spark scripts working with the 
spaces in paths on Windows
 add b121f0d  [SPARK-32873][BUILD] Fix code which causes error when build 
with sbt and Scala 2.13

No new revisions were added by this update.

Summary of changes:
 .../main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala   | 4 ++--
 mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala | 2 +-
 mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala   | 6 +++---
 mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala| 2 +-
 .../scala/org/apache/spark/sql/execution/command/commands.scala | 4 ++--
 .../apache/spark/sql/execution/datasources/v2/V2CommandExec.scala   | 2 +-
 .../spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala  | 2 +-
 .../execution/streaming/state/SymmetricHashJoinStateManager.scala   | 4 ++--
 .../apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala| 2 +-
 .../org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala  | 2 +-
 10 files changed, 15 insertions(+), 15 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org