[spark] branch master updated (0811666 -> d8a0d85)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 0811666 [SPARK-32878][CORE] Avoid scheduling TaskSetManager which has no pending tasks add d8a0d85 [SPARK-32884][TESTS] Mark TPCDSQuery*Suite as ExtendedSQLTest No new revisions were added by this update. Summary of changes: sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala | 4 1 file changed, 4 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (0811666 -> d8a0d85)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 0811666 [SPARK-32878][CORE] Avoid scheduling TaskSetManager which has no pending tasks add d8a0d85 [SPARK-32884][TESTS] Mark TPCDSQuery*Suite as ExtendedSQLTest No new revisions were added by this update. Summary of changes: sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala | 4 1 file changed, 4 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (0811666 -> d8a0d85)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 0811666 [SPARK-32878][CORE] Avoid scheduling TaskSetManager which has no pending tasks add d8a0d85 [SPARK-32884][TESTS] Mark TPCDSQuery*Suite as ExtendedSQLTest No new revisions were added by this update. Summary of changes: sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala | 4 1 file changed, 4 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (0811666 -> d8a0d85)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 0811666 [SPARK-32878][CORE] Avoid scheduling TaskSetManager which has no pending tasks add d8a0d85 [SPARK-32884][TESTS] Mark TPCDSQuery*Suite as ExtendedSQLTest No new revisions were added by this update. Summary of changes: sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala | 4 1 file changed, 4 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (0811666 -> d8a0d85)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 0811666 [SPARK-32878][CORE] Avoid scheduling TaskSetManager which has no pending tasks add d8a0d85 [SPARK-32884][TESTS] Mark TPCDSQuery*Suite as ExtendedSQLTest No new revisions were added by this update. Summary of changes: sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala | 4 1 file changed, 4 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (7a9b066 -> 0811666)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 7a9b066 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast add 0811666 [SPARK-32878][CORE] Avoid scheduling TaskSetManager which has no pending tasks No new revisions were added by this update. Summary of changes: core/src/main/scala/org/apache/spark/scheduler/Pool.scala | 4 +++- core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala | 1 + .../src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 3 +++ 4 files changed, 8 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (7a9b066 -> 0811666)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 7a9b066 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast add 0811666 [SPARK-32878][CORE] Avoid scheduling TaskSetManager which has no pending tasks No new revisions were added by this update. Summary of changes: core/src/main/scala/org/apache/spark/scheduler/Pool.scala | 4 +++- core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala | 1 + .../src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 3 +++ 4 files changed, 8 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (7a9b066 -> 0811666)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 7a9b066 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast add 0811666 [SPARK-32878][CORE] Avoid scheduling TaskSetManager which has no pending tasks No new revisions were added by this update. Summary of changes: core/src/main/scala/org/apache/spark/scheduler/Pool.scala | 4 +++- core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala | 1 + .../src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 3 +++ 4 files changed, 8 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (7a9b066 -> 0811666)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 7a9b066 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast add 0811666 [SPARK-32878][CORE] Avoid scheduling TaskSetManager which has no pending tasks No new revisions were added by this update. Summary of changes: core/src/main/scala/org/apache/spark/scheduler/Pool.scala | 4 +++- core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala | 1 + .../src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 3 +++ 4 files changed, 8 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (7a9b066 -> 0811666)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 7a9b066 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast add 0811666 [SPARK-32878][CORE] Avoid scheduling TaskSetManager which has no pending tasks No new revisions were added by this update. Summary of changes: core/src/main/scala/org/apache/spark/scheduler/Pool.scala | 4 +++- core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala | 1 + .../src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 3 +++ 4 files changed, 8 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new cf0c907 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast cf0c907 is described below commit cf0c907a1926c26f86b165ba7389ded375f0aa35 Author: LantaoJin AuthorDate: Mon Sep 14 18:24:52 2020 -0700 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast ### What changes were proposed in this pull request? In TorrentBroadcast.scala ```scala L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) L137: TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) ``` After the original value is saved successfully(TorrentBroadcast.scala: L133), but the following `blockifyObject()`(L137) or store piece(L147) steps are failed. There is no opportunity to release broadcast from memory. This patch is to remove all pieces of the broadcast when failed to blockify or failed to store some pieces of a broadcast. ### Why are the changes needed? We use Spark thrift-server as a long-running service. A bad query submitted a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the bad query but we found the driver's memory usage was still high and full GCs were still frequent. By investigating with GC dump and log, we found the broadcast may memory leak. > 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure) 2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc): 116G->112G(170G), 184.9121920 secs] [Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)] 1: 676531691 72035438432 [B 2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow 3: 99551 12018117568 [Ljava.lang.Object; 4: 26570 4349629040 [I 5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow; 6: 1708819 256299456 [C 7: 2338 179615208 [J 8: 1703669 54517408 java.lang.String 9: 103860 34896960 org.apache.spark.status.TaskDataWrapper 10: 177396 25545024 java.net.URI ... ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually test. This UT is hard to write and the patch is straightforward. Closes #29558 from LantaoJin/SPARK-32715. Authored-by: LantaoJin Signed-off-by: Dongjoon Hyun (cherry picked from commit 7a9b066c66d29e946b4f384292021123beb6fe57) Signed-off-by: Dongjoon Hyun --- .../apache/spark/broadcast/TorrentBroadcast.scala | 32 ++ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index cbd49e0..bd017d0 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -123,22 +123,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) { throw new SparkException(s"Failed to store $broadcastId in BlockManager") } -val blocks = - TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) -if (checksumEnabled) { - checksums = new Array[Int](blocks.length) -} -blocks.zipWithIndex.foreach { case (block, i) => +try { + val blocks = +TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) if (checksumEnabled) { -checksums(i) = calcChecksum(block) +checksums = new Array[Int](blocks.length) } - val pieceId = BroadcastBlockId(id, "piece" + i) - val bytes = new ChunkedByteBuffer(block.duplicate()) - if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { -throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager") + blocks.zipWithIndex.foreach { case (block, i) => +if (checksumEnabled) { + checksums(i) = calcChecksum(block) +} +val pieceId = BroadcastBlockId(id, "piece" + i) +val bytes = new ChunkedByteBuffer(block.duplicate()) +if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { + throw new SparkException(s"Failed to store $pieceId of $broadcastId " + +s"in local BlockManager") +} } +
[spark] branch branch-3.0 updated: [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new fe6ff15 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast fe6ff15 is described below commit fe6ff1577d18fc919db926894b32500e17e07ecb Author: LantaoJin AuthorDate: Mon Sep 14 18:24:52 2020 -0700 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast ### What changes were proposed in this pull request? In TorrentBroadcast.scala ```scala L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) L137: TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) ``` After the original value is saved successfully(TorrentBroadcast.scala: L133), but the following `blockifyObject()`(L137) or store piece(L147) steps are failed. There is no opportunity to release broadcast from memory. This patch is to remove all pieces of the broadcast when failed to blockify or failed to store some pieces of a broadcast. ### Why are the changes needed? We use Spark thrift-server as a long-running service. A bad query submitted a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the bad query but we found the driver's memory usage was still high and full GCs were still frequent. By investigating with GC dump and log, we found the broadcast may memory leak. > 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure) 2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc): 116G->112G(170G), 184.9121920 secs] [Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)] 1: 676531691 72035438432 [B 2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow 3: 99551 12018117568 [Ljava.lang.Object; 4: 26570 4349629040 [I 5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow; 6: 1708819 256299456 [C 7: 2338 179615208 [J 8: 1703669 54517408 java.lang.String 9: 103860 34896960 org.apache.spark.status.TaskDataWrapper 10: 177396 25545024 java.net.URI ... ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually test. This UT is hard to write and the patch is straightforward. Closes #29558 from LantaoJin/SPARK-32715. Authored-by: LantaoJin Signed-off-by: Dongjoon Hyun (cherry picked from commit 7a9b066c66d29e946b4f384292021123beb6fe57) Signed-off-by: Dongjoon Hyun --- .../apache/spark/broadcast/TorrentBroadcast.scala | 32 ++ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 77fbbc0..1024d9b 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -133,22 +133,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) { throw new SparkException(s"Failed to store $broadcastId in BlockManager") } -val blocks = - TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) -if (checksumEnabled) { - checksums = new Array[Int](blocks.length) -} -blocks.zipWithIndex.foreach { case (block, i) => +try { + val blocks = +TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) if (checksumEnabled) { -checksums(i) = calcChecksum(block) +checksums = new Array[Int](blocks.length) } - val pieceId = BroadcastBlockId(id, "piece" + i) - val bytes = new ChunkedByteBuffer(block.duplicate()) - if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { -throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager") + blocks.zipWithIndex.foreach { case (block, i) => +if (checksumEnabled) { + checksums(i) = calcChecksum(block) +} +val pieceId = BroadcastBlockId(id, "piece" + i) +val bytes = new ChunkedByteBuffer(block.duplicate()) +if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { + throw new SparkException(s"Failed to store $pieceId of $broadcastId " + +s"in local BlockManager") +} } +
[spark] branch branch-3.0 updated: [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new fe6ff15 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast fe6ff15 is described below commit fe6ff1577d18fc919db926894b32500e17e07ecb Author: LantaoJin AuthorDate: Mon Sep 14 18:24:52 2020 -0700 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast ### What changes were proposed in this pull request? In TorrentBroadcast.scala ```scala L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) L137: TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) ``` After the original value is saved successfully(TorrentBroadcast.scala: L133), but the following `blockifyObject()`(L137) or store piece(L147) steps are failed. There is no opportunity to release broadcast from memory. This patch is to remove all pieces of the broadcast when failed to blockify or failed to store some pieces of a broadcast. ### Why are the changes needed? We use Spark thrift-server as a long-running service. A bad query submitted a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the bad query but we found the driver's memory usage was still high and full GCs were still frequent. By investigating with GC dump and log, we found the broadcast may memory leak. > 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure) 2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc): 116G->112G(170G), 184.9121920 secs] [Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)] 1: 676531691 72035438432 [B 2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow 3: 99551 12018117568 [Ljava.lang.Object; 4: 26570 4349629040 [I 5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow; 6: 1708819 256299456 [C 7: 2338 179615208 [J 8: 1703669 54517408 java.lang.String 9: 103860 34896960 org.apache.spark.status.TaskDataWrapper 10: 177396 25545024 java.net.URI ... ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually test. This UT is hard to write and the patch is straightforward. Closes #29558 from LantaoJin/SPARK-32715. Authored-by: LantaoJin Signed-off-by: Dongjoon Hyun (cherry picked from commit 7a9b066c66d29e946b4f384292021123beb6fe57) Signed-off-by: Dongjoon Hyun --- .../apache/spark/broadcast/TorrentBroadcast.scala | 32 ++ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 77fbbc0..1024d9b 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -133,22 +133,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) { throw new SparkException(s"Failed to store $broadcastId in BlockManager") } -val blocks = - TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) -if (checksumEnabled) { - checksums = new Array[Int](blocks.length) -} -blocks.zipWithIndex.foreach { case (block, i) => +try { + val blocks = +TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) if (checksumEnabled) { -checksums(i) = calcChecksum(block) +checksums = new Array[Int](blocks.length) } - val pieceId = BroadcastBlockId(id, "piece" + i) - val bytes = new ChunkedByteBuffer(block.duplicate()) - if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { -throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager") + blocks.zipWithIndex.foreach { case (block, i) => +if (checksumEnabled) { + checksums(i) = calcChecksum(block) +} +val pieceId = BroadcastBlockId(id, "piece" + i) +val bytes = new ChunkedByteBuffer(block.duplicate()) +if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { + throw new SparkException(s"Failed to store $pieceId of $broadcastId " + +s"in local BlockManager") +} } +
[spark] branch master updated (4fac6d5 -> 7a9b066)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4fac6d5 [SPARK-32871][BUILD] Append toMap to Map#filterKeys if the result of filter is concatenated with another Map for Scala 2.13 add 7a9b066 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast No new revisions were added by this update. Summary of changes: .../apache/spark/broadcast/TorrentBroadcast.scala | 32 ++ 1 file changed, 20 insertions(+), 12 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new cf0c907 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast cf0c907 is described below commit cf0c907a1926c26f86b165ba7389ded375f0aa35 Author: LantaoJin AuthorDate: Mon Sep 14 18:24:52 2020 -0700 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast ### What changes were proposed in this pull request? In TorrentBroadcast.scala ```scala L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) L137: TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) ``` After the original value is saved successfully(TorrentBroadcast.scala: L133), but the following `blockifyObject()`(L137) or store piece(L147) steps are failed. There is no opportunity to release broadcast from memory. This patch is to remove all pieces of the broadcast when failed to blockify or failed to store some pieces of a broadcast. ### Why are the changes needed? We use Spark thrift-server as a long-running service. A bad query submitted a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the bad query but we found the driver's memory usage was still high and full GCs were still frequent. By investigating with GC dump and log, we found the broadcast may memory leak. > 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure) 2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc): 116G->112G(170G), 184.9121920 secs] [Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)] 1: 676531691 72035438432 [B 2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow 3: 99551 12018117568 [Ljava.lang.Object; 4: 26570 4349629040 [I 5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow; 6: 1708819 256299456 [C 7: 2338 179615208 [J 8: 1703669 54517408 java.lang.String 9: 103860 34896960 org.apache.spark.status.TaskDataWrapper 10: 177396 25545024 java.net.URI ... ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually test. This UT is hard to write and the patch is straightforward. Closes #29558 from LantaoJin/SPARK-32715. Authored-by: LantaoJin Signed-off-by: Dongjoon Hyun (cherry picked from commit 7a9b066c66d29e946b4f384292021123beb6fe57) Signed-off-by: Dongjoon Hyun --- .../apache/spark/broadcast/TorrentBroadcast.scala | 32 ++ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index cbd49e0..bd017d0 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -123,22 +123,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) { throw new SparkException(s"Failed to store $broadcastId in BlockManager") } -val blocks = - TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) -if (checksumEnabled) { - checksums = new Array[Int](blocks.length) -} -blocks.zipWithIndex.foreach { case (block, i) => +try { + val blocks = +TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) if (checksumEnabled) { -checksums(i) = calcChecksum(block) +checksums = new Array[Int](blocks.length) } - val pieceId = BroadcastBlockId(id, "piece" + i) - val bytes = new ChunkedByteBuffer(block.duplicate()) - if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { -throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager") + blocks.zipWithIndex.foreach { case (block, i) => +if (checksumEnabled) { + checksums(i) = calcChecksum(block) +} +val pieceId = BroadcastBlockId(id, "piece" + i) +val bytes = new ChunkedByteBuffer(block.duplicate()) +if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { + throw new SparkException(s"Failed to store $pieceId of $broadcastId " + +s"in local BlockManager") +} } +
[spark] branch branch-3.0 updated: [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new fe6ff15 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast fe6ff15 is described below commit fe6ff1577d18fc919db926894b32500e17e07ecb Author: LantaoJin AuthorDate: Mon Sep 14 18:24:52 2020 -0700 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast ### What changes were proposed in this pull request? In TorrentBroadcast.scala ```scala L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) L137: TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) ``` After the original value is saved successfully(TorrentBroadcast.scala: L133), but the following `blockifyObject()`(L137) or store piece(L147) steps are failed. There is no opportunity to release broadcast from memory. This patch is to remove all pieces of the broadcast when failed to blockify or failed to store some pieces of a broadcast. ### Why are the changes needed? We use Spark thrift-server as a long-running service. A bad query submitted a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the bad query but we found the driver's memory usage was still high and full GCs were still frequent. By investigating with GC dump and log, we found the broadcast may memory leak. > 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure) 2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc): 116G->112G(170G), 184.9121920 secs] [Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)] 1: 676531691 72035438432 [B 2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow 3: 99551 12018117568 [Ljava.lang.Object; 4: 26570 4349629040 [I 5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow; 6: 1708819 256299456 [C 7: 2338 179615208 [J 8: 1703669 54517408 java.lang.String 9: 103860 34896960 org.apache.spark.status.TaskDataWrapper 10: 177396 25545024 java.net.URI ... ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually test. This UT is hard to write and the patch is straightforward. Closes #29558 from LantaoJin/SPARK-32715. Authored-by: LantaoJin Signed-off-by: Dongjoon Hyun (cherry picked from commit 7a9b066c66d29e946b4f384292021123beb6fe57) Signed-off-by: Dongjoon Hyun --- .../apache/spark/broadcast/TorrentBroadcast.scala | 32 ++ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 77fbbc0..1024d9b 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -133,22 +133,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) { throw new SparkException(s"Failed to store $broadcastId in BlockManager") } -val blocks = - TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) -if (checksumEnabled) { - checksums = new Array[Int](blocks.length) -} -blocks.zipWithIndex.foreach { case (block, i) => +try { + val blocks = +TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) if (checksumEnabled) { -checksums(i) = calcChecksum(block) +checksums = new Array[Int](blocks.length) } - val pieceId = BroadcastBlockId(id, "piece" + i) - val bytes = new ChunkedByteBuffer(block.duplicate()) - if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { -throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager") + blocks.zipWithIndex.foreach { case (block, i) => +if (checksumEnabled) { + checksums(i) = calcChecksum(block) +} +val pieceId = BroadcastBlockId(id, "piece" + i) +val bytes = new ChunkedByteBuffer(block.duplicate()) +if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { + throw new SparkException(s"Failed to store $pieceId of $broadcastId " + +s"in local BlockManager") +} } +
[spark] branch master updated (4fac6d5 -> 7a9b066)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4fac6d5 [SPARK-32871][BUILD] Append toMap to Map#filterKeys if the result of filter is concatenated with another Map for Scala 2.13 add 7a9b066 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast No new revisions were added by this update. Summary of changes: .../apache/spark/broadcast/TorrentBroadcast.scala | 32 ++ 1 file changed, 20 insertions(+), 12 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new cf0c907 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast cf0c907 is described below commit cf0c907a1926c26f86b165ba7389ded375f0aa35 Author: LantaoJin AuthorDate: Mon Sep 14 18:24:52 2020 -0700 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast ### What changes were proposed in this pull request? In TorrentBroadcast.scala ```scala L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) L137: TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) ``` After the original value is saved successfully(TorrentBroadcast.scala: L133), but the following `blockifyObject()`(L137) or store piece(L147) steps are failed. There is no opportunity to release broadcast from memory. This patch is to remove all pieces of the broadcast when failed to blockify or failed to store some pieces of a broadcast. ### Why are the changes needed? We use Spark thrift-server as a long-running service. A bad query submitted a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the bad query but we found the driver's memory usage was still high and full GCs were still frequent. By investigating with GC dump and log, we found the broadcast may memory leak. > 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure) 2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc): 116G->112G(170G), 184.9121920 secs] [Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)] 1: 676531691 72035438432 [B 2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow 3: 99551 12018117568 [Ljava.lang.Object; 4: 26570 4349629040 [I 5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow; 6: 1708819 256299456 [C 7: 2338 179615208 [J 8: 1703669 54517408 java.lang.String 9: 103860 34896960 org.apache.spark.status.TaskDataWrapper 10: 177396 25545024 java.net.URI ... ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually test. This UT is hard to write and the patch is straightforward. Closes #29558 from LantaoJin/SPARK-32715. Authored-by: LantaoJin Signed-off-by: Dongjoon Hyun (cherry picked from commit 7a9b066c66d29e946b4f384292021123beb6fe57) Signed-off-by: Dongjoon Hyun --- .../apache/spark/broadcast/TorrentBroadcast.scala | 32 ++ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index cbd49e0..bd017d0 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -123,22 +123,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) { throw new SparkException(s"Failed to store $broadcastId in BlockManager") } -val blocks = - TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) -if (checksumEnabled) { - checksums = new Array[Int](blocks.length) -} -blocks.zipWithIndex.foreach { case (block, i) => +try { + val blocks = +TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) if (checksumEnabled) { -checksums(i) = calcChecksum(block) +checksums = new Array[Int](blocks.length) } - val pieceId = BroadcastBlockId(id, "piece" + i) - val bytes = new ChunkedByteBuffer(block.duplicate()) - if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { -throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager") + blocks.zipWithIndex.foreach { case (block, i) => +if (checksumEnabled) { + checksums(i) = calcChecksum(block) +} +val pieceId = BroadcastBlockId(id, "piece" + i) +val bytes = new ChunkedByteBuffer(block.duplicate()) +if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { + throw new SparkException(s"Failed to store $pieceId of $broadcastId " + +s"in local BlockManager") +} } +
[spark] branch branch-3.0 updated: [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new fe6ff15 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast fe6ff15 is described below commit fe6ff1577d18fc919db926894b32500e17e07ecb Author: LantaoJin AuthorDate: Mon Sep 14 18:24:52 2020 -0700 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast ### What changes were proposed in this pull request? In TorrentBroadcast.scala ```scala L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) L137: TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) ``` After the original value is saved successfully(TorrentBroadcast.scala: L133), but the following `blockifyObject()`(L137) or store piece(L147) steps are failed. There is no opportunity to release broadcast from memory. This patch is to remove all pieces of the broadcast when failed to blockify or failed to store some pieces of a broadcast. ### Why are the changes needed? We use Spark thrift-server as a long-running service. A bad query submitted a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the bad query but we found the driver's memory usage was still high and full GCs were still frequent. By investigating with GC dump and log, we found the broadcast may memory leak. > 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure) 2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc): 116G->112G(170G), 184.9121920 secs] [Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)] 1: 676531691 72035438432 [B 2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow 3: 99551 12018117568 [Ljava.lang.Object; 4: 26570 4349629040 [I 5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow; 6: 1708819 256299456 [C 7: 2338 179615208 [J 8: 1703669 54517408 java.lang.String 9: 103860 34896960 org.apache.spark.status.TaskDataWrapper 10: 177396 25545024 java.net.URI ... ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually test. This UT is hard to write and the patch is straightforward. Closes #29558 from LantaoJin/SPARK-32715. Authored-by: LantaoJin Signed-off-by: Dongjoon Hyun (cherry picked from commit 7a9b066c66d29e946b4f384292021123beb6fe57) Signed-off-by: Dongjoon Hyun --- .../apache/spark/broadcast/TorrentBroadcast.scala | 32 ++ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 77fbbc0..1024d9b 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -133,22 +133,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) { throw new SparkException(s"Failed to store $broadcastId in BlockManager") } -val blocks = - TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) -if (checksumEnabled) { - checksums = new Array[Int](blocks.length) -} -blocks.zipWithIndex.foreach { case (block, i) => +try { + val blocks = +TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) if (checksumEnabled) { -checksums(i) = calcChecksum(block) +checksums = new Array[Int](blocks.length) } - val pieceId = BroadcastBlockId(id, "piece" + i) - val bytes = new ChunkedByteBuffer(block.duplicate()) - if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { -throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager") + blocks.zipWithIndex.foreach { case (block, i) => +if (checksumEnabled) { + checksums(i) = calcChecksum(block) +} +val pieceId = BroadcastBlockId(id, "piece" + i) +val bytes = new ChunkedByteBuffer(block.duplicate()) +if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { + throw new SparkException(s"Failed to store $pieceId of $broadcastId " + +s"in local BlockManager") +} } +
[spark] branch master updated (4fac6d5 -> 7a9b066)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4fac6d5 [SPARK-32871][BUILD] Append toMap to Map#filterKeys if the result of filter is concatenated with another Map for Scala 2.13 add 7a9b066 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast No new revisions were added by this update. Summary of changes: .../apache/spark/broadcast/TorrentBroadcast.scala | 32 ++ 1 file changed, 20 insertions(+), 12 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new cf0c907 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast cf0c907 is described below commit cf0c907a1926c26f86b165ba7389ded375f0aa35 Author: LantaoJin AuthorDate: Mon Sep 14 18:24:52 2020 -0700 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast ### What changes were proposed in this pull request? In TorrentBroadcast.scala ```scala L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) L137: TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) ``` After the original value is saved successfully(TorrentBroadcast.scala: L133), but the following `blockifyObject()`(L137) or store piece(L147) steps are failed. There is no opportunity to release broadcast from memory. This patch is to remove all pieces of the broadcast when failed to blockify or failed to store some pieces of a broadcast. ### Why are the changes needed? We use Spark thrift-server as a long-running service. A bad query submitted a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the bad query but we found the driver's memory usage was still high and full GCs were still frequent. By investigating with GC dump and log, we found the broadcast may memory leak. > 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure) 2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc): 116G->112G(170G), 184.9121920 secs] [Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)] 1: 676531691 72035438432 [B 2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow 3: 99551 12018117568 [Ljava.lang.Object; 4: 26570 4349629040 [I 5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow; 6: 1708819 256299456 [C 7: 2338 179615208 [J 8: 1703669 54517408 java.lang.String 9: 103860 34896960 org.apache.spark.status.TaskDataWrapper 10: 177396 25545024 java.net.URI ... ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually test. This UT is hard to write and the patch is straightforward. Closes #29558 from LantaoJin/SPARK-32715. Authored-by: LantaoJin Signed-off-by: Dongjoon Hyun (cherry picked from commit 7a9b066c66d29e946b4f384292021123beb6fe57) Signed-off-by: Dongjoon Hyun --- .../apache/spark/broadcast/TorrentBroadcast.scala | 32 ++ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index cbd49e0..bd017d0 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -123,22 +123,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) { throw new SparkException(s"Failed to store $broadcastId in BlockManager") } -val blocks = - TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) -if (checksumEnabled) { - checksums = new Array[Int](blocks.length) -} -blocks.zipWithIndex.foreach { case (block, i) => +try { + val blocks = +TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) if (checksumEnabled) { -checksums(i) = calcChecksum(block) +checksums = new Array[Int](blocks.length) } - val pieceId = BroadcastBlockId(id, "piece" + i) - val bytes = new ChunkedByteBuffer(block.duplicate()) - if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { -throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager") + blocks.zipWithIndex.foreach { case (block, i) => +if (checksumEnabled) { + checksums(i) = calcChecksum(block) +} +val pieceId = BroadcastBlockId(id, "piece" + i) +val bytes = new ChunkedByteBuffer(block.duplicate()) +if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { + throw new SparkException(s"Failed to store $pieceId of $broadcastId " + +s"in local BlockManager") +} } +
[spark] branch branch-3.0 updated: [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new fe6ff15 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast fe6ff15 is described below commit fe6ff1577d18fc919db926894b32500e17e07ecb Author: LantaoJin AuthorDate: Mon Sep 14 18:24:52 2020 -0700 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast ### What changes were proposed in this pull request? In TorrentBroadcast.scala ```scala L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) L137: TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) ``` After the original value is saved successfully(TorrentBroadcast.scala: L133), but the following `blockifyObject()`(L137) or store piece(L147) steps are failed. There is no opportunity to release broadcast from memory. This patch is to remove all pieces of the broadcast when failed to blockify or failed to store some pieces of a broadcast. ### Why are the changes needed? We use Spark thrift-server as a long-running service. A bad query submitted a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the bad query but we found the driver's memory usage was still high and full GCs were still frequent. By investigating with GC dump and log, we found the broadcast may memory leak. > 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure) 2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc): 116G->112G(170G), 184.9121920 secs] [Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)] 1: 676531691 72035438432 [B 2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow 3: 99551 12018117568 [Ljava.lang.Object; 4: 26570 4349629040 [I 5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow; 6: 1708819 256299456 [C 7: 2338 179615208 [J 8: 1703669 54517408 java.lang.String 9: 103860 34896960 org.apache.spark.status.TaskDataWrapper 10: 177396 25545024 java.net.URI ... ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually test. This UT is hard to write and the patch is straightforward. Closes #29558 from LantaoJin/SPARK-32715. Authored-by: LantaoJin Signed-off-by: Dongjoon Hyun (cherry picked from commit 7a9b066c66d29e946b4f384292021123beb6fe57) Signed-off-by: Dongjoon Hyun --- .../apache/spark/broadcast/TorrentBroadcast.scala | 32 ++ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 77fbbc0..1024d9b 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -133,22 +133,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) { throw new SparkException(s"Failed to store $broadcastId in BlockManager") } -val blocks = - TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) -if (checksumEnabled) { - checksums = new Array[Int](blocks.length) -} -blocks.zipWithIndex.foreach { case (block, i) => +try { + val blocks = +TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) if (checksumEnabled) { -checksums(i) = calcChecksum(block) +checksums = new Array[Int](blocks.length) } - val pieceId = BroadcastBlockId(id, "piece" + i) - val bytes = new ChunkedByteBuffer(block.duplicate()) - if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { -throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager") + blocks.zipWithIndex.foreach { case (block, i) => +if (checksumEnabled) { + checksums(i) = calcChecksum(block) +} +val pieceId = BroadcastBlockId(id, "piece" + i) +val bytes = new ChunkedByteBuffer(block.duplicate()) +if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { + throw new SparkException(s"Failed to store $pieceId of $broadcastId " + +s"in local BlockManager") +} } +
[spark] branch master updated (4fac6d5 -> 7a9b066)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4fac6d5 [SPARK-32871][BUILD] Append toMap to Map#filterKeys if the result of filter is concatenated with another Map for Scala 2.13 add 7a9b066 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast No new revisions were added by this update. Summary of changes: .../apache/spark/broadcast/TorrentBroadcast.scala | 32 ++ 1 file changed, 20 insertions(+), 12 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 5543f98 [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold 5543f98 is described below commit 5543f9840a024fd3e97fb64d8372c69a358e8977 Author: Ankur Dave AuthorDate: Mon Sep 14 18:12:42 2020 -0700 [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold ### What changes were proposed in this pull request? When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has ty [...] This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them. ### Why are the changes needed? Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage. For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes. ```java public abstract class AbstractBytesToBytesMapSuite { // ... Test public void respectGrowthThresholdAtMaxCapacity() { TestMemoryManager memoryManager2 = new TestMemoryManager( new SparkConf() .set("spark.memory.offHeap.enabled", "true") .set("spark.memory.offHeap.size", Long.toString(25600 * 1024 * 1024L))); TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0); final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page marker final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes); try { // Insert keys into the map until it stops accepting new keys. for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) { if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i); final long[] value = new long[]{i}; BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8); Assert.assertFalse(loc.isDefined()); boolean success = loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8); if (!success) break; } // The map should grow to its max capacity. long capacity = map.getArray().size() / 2; Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY); // The map should stop accepting new keys once it has reached its growth // threshold, which is half the max capacity. Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2); map.free(); } finally { map.free(); } } } ``` Closes #29753 from ankurdave/SPARK-32872-2.4. Authored-by: Ankur Dave Signed-off-by: Dongjoon Hyun --- .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++-- 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 5ab52cc..e2d258a 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -756,12 +756,21 @@ public final class BytesToBytesMap extends MemoryConsumer { longArray.set(pos * 2 + 1, keyHashcode); isDefined = true; -// We use two array entries per key, so the array size is twice the capacity. -// We should compare the current capacity of the array, instead of
[spark] branch branch-2.4 updated: [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 5543f98 [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold 5543f98 is described below commit 5543f9840a024fd3e97fb64d8372c69a358e8977 Author: Ankur Dave AuthorDate: Mon Sep 14 18:12:42 2020 -0700 [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold ### What changes were proposed in this pull request? When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has ty [...] This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them. ### Why are the changes needed? Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage. For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes. ```java public abstract class AbstractBytesToBytesMapSuite { // ... Test public void respectGrowthThresholdAtMaxCapacity() { TestMemoryManager memoryManager2 = new TestMemoryManager( new SparkConf() .set("spark.memory.offHeap.enabled", "true") .set("spark.memory.offHeap.size", Long.toString(25600 * 1024 * 1024L))); TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0); final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page marker final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes); try { // Insert keys into the map until it stops accepting new keys. for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) { if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i); final long[] value = new long[]{i}; BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8); Assert.assertFalse(loc.isDefined()); boolean success = loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8); if (!success) break; } // The map should grow to its max capacity. long capacity = map.getArray().size() / 2; Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY); // The map should stop accepting new keys once it has reached its growth // threshold, which is half the max capacity. Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2); map.free(); } finally { map.free(); } } } ``` Closes #29753 from ankurdave/SPARK-32872-2.4. Authored-by: Ankur Dave Signed-off-by: Dongjoon Hyun --- .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++-- 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 5ab52cc..e2d258a 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -756,12 +756,21 @@ public final class BytesToBytesMap extends MemoryConsumer { longArray.set(pos * 2 + 1, keyHashcode); isDefined = true; -// We use two array entries per key, so the array size is twice the capacity. -// We should compare the current capacity of the array, instead of
[spark] branch branch-2.4 updated: [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 5543f98 [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold 5543f98 is described below commit 5543f9840a024fd3e97fb64d8372c69a358e8977 Author: Ankur Dave AuthorDate: Mon Sep 14 18:12:42 2020 -0700 [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold ### What changes were proposed in this pull request? When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has ty [...] This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them. ### Why are the changes needed? Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage. For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes. ```java public abstract class AbstractBytesToBytesMapSuite { // ... Test public void respectGrowthThresholdAtMaxCapacity() { TestMemoryManager memoryManager2 = new TestMemoryManager( new SparkConf() .set("spark.memory.offHeap.enabled", "true") .set("spark.memory.offHeap.size", Long.toString(25600 * 1024 * 1024L))); TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0); final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page marker final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes); try { // Insert keys into the map until it stops accepting new keys. for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) { if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i); final long[] value = new long[]{i}; BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8); Assert.assertFalse(loc.isDefined()); boolean success = loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8); if (!success) break; } // The map should grow to its max capacity. long capacity = map.getArray().size() / 2; Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY); // The map should stop accepting new keys once it has reached its growth // threshold, which is half the max capacity. Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2); map.free(); } finally { map.free(); } } } ``` Closes #29753 from ankurdave/SPARK-32872-2.4. Authored-by: Ankur Dave Signed-off-by: Dongjoon Hyun --- .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++-- 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 5ab52cc..e2d258a 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -756,12 +756,21 @@ public final class BytesToBytesMap extends MemoryConsumer { longArray.set(pos * 2 + 1, keyHashcode); isDefined = true; -// We use two array entries per key, so the array size is twice the capacity. -// We should compare the current capacity of the array, instead of
[spark] branch branch-2.4 updated: [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 5543f98 [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold 5543f98 is described below commit 5543f9840a024fd3e97fb64d8372c69a358e8977 Author: Ankur Dave AuthorDate: Mon Sep 14 18:12:42 2020 -0700 [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold ### What changes were proposed in this pull request? When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has ty [...] This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them. ### Why are the changes needed? Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage. For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes. ```java public abstract class AbstractBytesToBytesMapSuite { // ... Test public void respectGrowthThresholdAtMaxCapacity() { TestMemoryManager memoryManager2 = new TestMemoryManager( new SparkConf() .set("spark.memory.offHeap.enabled", "true") .set("spark.memory.offHeap.size", Long.toString(25600 * 1024 * 1024L))); TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0); final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page marker final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes); try { // Insert keys into the map until it stops accepting new keys. for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) { if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i); final long[] value = new long[]{i}; BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8); Assert.assertFalse(loc.isDefined()); boolean success = loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8); if (!success) break; } // The map should grow to its max capacity. long capacity = map.getArray().size() / 2; Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY); // The map should stop accepting new keys once it has reached its growth // threshold, which is half the max capacity. Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2); map.free(); } finally { map.free(); } } } ``` Closes #29753 from ankurdave/SPARK-32872-2.4. Authored-by: Ankur Dave Signed-off-by: Dongjoon Hyun --- .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++-- 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 5ab52cc..e2d258a 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -756,12 +756,21 @@ public final class BytesToBytesMap extends MemoryConsumer { longArray.set(pos * 2 + 1, keyHashcode); isDefined = true; -// We use two array entries per key, so the array size is twice the capacity. -// We should compare the current capacity of the array, instead of
[spark] branch branch-2.4 updated: [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 5543f98 [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold 5543f98 is described below commit 5543f9840a024fd3e97fb64d8372c69a358e8977 Author: Ankur Dave AuthorDate: Mon Sep 14 18:12:42 2020 -0700 [SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold ### What changes were proposed in this pull request? When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has ty [...] This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them. ### Why are the changes needed? Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage. For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes. ```java public abstract class AbstractBytesToBytesMapSuite { // ... Test public void respectGrowthThresholdAtMaxCapacity() { TestMemoryManager memoryManager2 = new TestMemoryManager( new SparkConf() .set("spark.memory.offHeap.enabled", "true") .set("spark.memory.offHeap.size", Long.toString(25600 * 1024 * 1024L))); TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0); final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page marker final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes); try { // Insert keys into the map until it stops accepting new keys. for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) { if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i); final long[] value = new long[]{i}; BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8); Assert.assertFalse(loc.isDefined()); boolean success = loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8); if (!success) break; } // The map should grow to its max capacity. long capacity = map.getArray().size() / 2; Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY); // The map should stop accepting new keys once it has reached its growth // threshold, which is half the max capacity. Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2); map.free(); } finally { map.free(); } } } ``` Closes #29753 from ankurdave/SPARK-32872-2.4. Authored-by: Ankur Dave Signed-off-by: Dongjoon Hyun --- .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++-- 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 5ab52cc..e2d258a 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -756,12 +756,21 @@ public final class BytesToBytesMap extends MemoryConsumer { longArray.set(pos * 2 + 1, keyHashcode); isDefined = true; -// We use two array entries per key, so the array size is twice the capacity. -// We should compare the current capacity of the array, instead of
[spark] branch master updated (d58a4a3 -> 4fac6d5)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from d58a4a3 [SPARK-32882][K8S] Remove python2 installation in K8s python image add 4fac6d5 [SPARK-32871][BUILD] Append toMap to Map#filterKeys if the result of filter is concatenated with another Map for Scala 2.13 No new revisions were added by this update. Summary of changes: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala| 4 ++-- sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala| 4 ++-- .../main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala | 4 ++-- .../main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (d58a4a3 -> 4fac6d5)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from d58a4a3 [SPARK-32882][K8S] Remove python2 installation in K8s python image add 4fac6d5 [SPARK-32871][BUILD] Append toMap to Map#filterKeys if the result of filter is concatenated with another Map for Scala 2.13 No new revisions were added by this update. Summary of changes: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala| 4 ++-- sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala| 4 ++-- .../main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala | 4 ++-- .../main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (d58a4a3 -> 4fac6d5)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from d58a4a3 [SPARK-32882][K8S] Remove python2 installation in K8s python image add 4fac6d5 [SPARK-32871][BUILD] Append toMap to Map#filterKeys if the result of filter is concatenated with another Map for Scala 2.13 No new revisions were added by this update. Summary of changes: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala| 4 ++-- sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala| 4 ++-- .../main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala | 4 ++-- .../main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (d58a4a3 -> 4fac6d5)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from d58a4a3 [SPARK-32882][K8S] Remove python2 installation in K8s python image add 4fac6d5 [SPARK-32871][BUILD] Append toMap to Map#filterKeys if the result of filter is concatenated with another Map for Scala 2.13 No new revisions were added by this update. Summary of changes: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala| 4 ++-- sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala| 4 ++-- .../main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala | 4 ++-- .../main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (d58a4a3 -> 4fac6d5)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from d58a4a3 [SPARK-32882][K8S] Remove python2 installation in K8s python image add 4fac6d5 [SPARK-32871][BUILD] Append toMap to Map#filterKeys if the result of filter is concatenated with another Map for Scala 2.13 No new revisions were added by this update. Summary of changes: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala| 4 ++-- sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala| 4 ++-- .../main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala | 4 ++-- .../main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (72550c3 -> d58a4a3)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 72550c3 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold add d58a4a3 [SPARK-32882][K8S] Remove python2 installation in K8s python image No new revisions were added by this update. Summary of changes: .../docker/src/main/dockerfiles/spark/bindings/python/Dockerfile | 8 +--- 1 file changed, 1 insertion(+), 7 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (72550c3 -> d58a4a3)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 72550c3 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold add d58a4a3 [SPARK-32882][K8S] Remove python2 installation in K8s python image No new revisions were added by this update. Summary of changes: .../docker/src/main/dockerfiles/spark/bindings/python/Dockerfile | 8 +--- 1 file changed, 1 insertion(+), 7 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (72550c3 -> d58a4a3)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 72550c3 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold add d58a4a3 [SPARK-32882][K8S] Remove python2 installation in K8s python image No new revisions were added by this update. Summary of changes: .../docker/src/main/dockerfiles/spark/bindings/python/Dockerfile | 8 +--- 1 file changed, 1 insertion(+), 7 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (72550c3 -> d58a4a3)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 72550c3 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold add d58a4a3 [SPARK-32882][K8S] Remove python2 installation in K8s python image No new revisions were added by this update. Summary of changes: .../docker/src/main/dockerfiles/spark/bindings/python/Dockerfile | 8 +--- 1 file changed, 1 insertion(+), 7 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (72550c3 -> d58a4a3)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 72550c3 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold add d58a4a3 [SPARK-32882][K8S] Remove python2 installation in K8s python image No new revisions were added by this update. Summary of changes: .../docker/src/main/dockerfiles/spark/bindings/python/Dockerfile | 8 +--- 1 file changed, 1 insertion(+), 7 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated (ad61af0 -> 94b9d6f)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git. from ad61af0 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold add 94b9d6f Revert "[SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold" No new revisions were added by this update. Summary of changes: .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 ++--- 1 file changed, 6 insertions(+), 15 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated (ad61af0 -> 94b9d6f)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git. from ad61af0 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold add 94b9d6f Revert "[SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold" No new revisions were added by this update. Summary of changes: .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 ++--- 1 file changed, 6 insertions(+), 15 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated (ad61af0 -> 94b9d6f)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git. from ad61af0 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold add 94b9d6f Revert "[SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold" No new revisions were added by this update. Summary of changes: .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 ++--- 1 file changed, 6 insertions(+), 15 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated (d33052b -> ad61af0)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git. from d33052b [SPARK-32708] Query optimization fails to reuse exchange with DataSourceV2 add ad61af0 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold No new revisions were added by this update. Summary of changes: .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++-- 1 file changed, 15 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated (ad61af0 -> 94b9d6f)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git. from ad61af0 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold add 94b9d6f Revert "[SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold" No new revisions were added by this update. Summary of changes: .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 ++--- 1 file changed, 6 insertions(+), 15 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new ad61af0 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold ad61af0 is described below commit ad61af01a98487176e7bc15ef31d697c85d863f3 Author: Ankur Dave AuthorDate: Mon Sep 14 13:58:15 2020 -0700 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has ty [...] This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them. Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage. For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`. No. Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes. ```java public abstract class AbstractBytesToBytesMapSuite { // ... Test public void respectGrowthThresholdAtMaxCapacity() { TestMemoryManager memoryManager2 = new TestMemoryManager( new SparkConf() .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true) .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 1024L) .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false) .set(package$.MODULE$.SHUFFLE_COMPRESS(), false)); TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0); final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page marker final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes); try { // Insert keys into the map until it stops accepting new keys. for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) { if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i); final long[] value = new long[]{i}; BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8); Assert.assertFalse(loc.isDefined()); boolean success = loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8); if (!success) break; } // The map should grow to its max capacity. long capacity = map.getArray().size() / 2; Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY); // The map should stop accepting new keys once it has reached its growth // threshold, which is half the max capacity. Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2); map.free(); } finally { map.free(); } } } ``` Closes #29744 from ankurdave/SPARK-32872. Authored-by: Ankur Dave Signed-off-by: Dongjoon Hyun (cherry picked from commit 72550c3be7120fcf2844d6914e883f1bec30d93f) Signed-off-by: Dongjoon Hyun --- .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++-- 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 5ab52cc..4339e92 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -756,12 +756,21 @@ public final class BytesToBytesMap extends MemoryConsumer { longArray.set(pos * 2 + 1, keyHashcode); isDefined = true; -// We use two array entries per key, so the array size is twice the capacity. -// We should compare the current capacity
[spark] branch branch-3.0 updated: [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 990d49a [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold 990d49a is described below commit 990d49a28d6b7d1bd011aa7f1666217bcc920a77 Author: Ankur Dave AuthorDate: Mon Sep 14 13:58:15 2020 -0700 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold ### What changes were proposed in this pull request? When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has ty [...] This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them. ### Why are the changes needed? Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage. For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes. ```java public abstract class AbstractBytesToBytesMapSuite { // ... Test public void respectGrowthThresholdAtMaxCapacity() { TestMemoryManager memoryManager2 = new TestMemoryManager( new SparkConf() .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true) .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 1024L) .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false) .set(package$.MODULE$.SHUFFLE_COMPRESS(), false)); TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0); final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page marker final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes); try { // Insert keys into the map until it stops accepting new keys. for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) { if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i); final long[] value = new long[]{i}; BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8); Assert.assertFalse(loc.isDefined()); boolean success = loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8); if (!success) break; } // The map should grow to its max capacity. long capacity = map.getArray().size() / 2; Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY); // The map should stop accepting new keys once it has reached its growth // threshold, which is half the max capacity. Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2); map.free(); } finally { map.free(); } } } ``` Closes #29744 from ankurdave/SPARK-32872. Authored-by: Ankur Dave Signed-off-by: Dongjoon Hyun (cherry picked from commit 72550c3be7120fcf2844d6914e883f1bec30d93f) Signed-off-by: Dongjoon Hyun --- .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++-- 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 6e02888..4036856 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -738,12 +738,21 @@ public final class BytesToBytesMap extends MemoryConsumer {
[spark] branch master updated (0696f046 -> 72550c3)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 0696f046 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite add 72550c3 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold No new revisions were added by this update. Summary of changes: .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++-- 1 file changed, 15 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 828603d [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite 828603d is described below commit 828603d3c0dbe2ad88a8f55c075b7163868cd6d9 Author: HyukjinKwon AuthorDate: Mon Sep 14 13:54:21 2020 -0700 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite ### What changes were proposed in this pull request? The Jenkins job fails to get the versions. This was fixed by adding temporary fallbacks at https://github.com/apache/spark/pull/28536. This still doesn't work without the temporary fallbacks. See https://github.com/apache/spark/pull/29694 This PR adds new fallbacks since 2.3 is EOL and Spark 3.0.1 and 2.4.7 are released. ### Why are the changes needed? To test correctly in Jenkins. ### Does this PR introduce _any_ user-facing change? No, dev-only ### How was this patch tested? Jenkins and GitHub Actions builds should test. Closes #29748 from HyukjinKwon/SPARK-32876. Authored-by: HyukjinKwon Signed-off-by: Dongjoon Hyun (cherry picked from commit 0696f0467270969f40e9baa829533bdb55f4002a) Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala index aa96fa0..cbfdb7f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -243,7 +243,7 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils { .filter(_ < org.apache.spark.SPARK_VERSION) } catch { // do not throw exception during object initialization. - case NonFatal(_) => Seq("2.3.4", "2.4.5") // A temporary fallback to use a specific version + case NonFatal(_) => Seq("3.0.1", "2.4.7") // A temporary fallback to use a specific version } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: Revert "[SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold"
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 94b9d6f Revert "[SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold" 94b9d6f is described below commit 94b9d6f3c868689333ddc0ea20f1b80fb4e10a5e Author: Dongjoon Hyun AuthorDate: Mon Sep 14 14:22:08 2020 -0700 Revert "[SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold" This reverts commit ad61af01a98487176e7bc15ef31d697c85d863f3. --- .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 ++--- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 4339e92..5ab52cc 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -756,21 +756,12 @@ public final class BytesToBytesMap extends MemoryConsumer { longArray.set(pos * 2 + 1, keyHashcode); isDefined = true; -// If the map has reached its growth threshold, try to grow it. -if (numKeys >= growthThreshold) { - // We use two array entries per key, so the array size is twice the capacity. - // We should compare the current capacity of the array, instead of its size. - if (longArray.size() / 2 < MAX_CAPACITY) { -try { - growAndRehash(); -} catch (SparkOutOfMemoryError oom) { - canGrowArray = false; -} - } else { -// The map is already at MAX_CAPACITY and cannot grow. Instead, we prevent it from -// accepting any more new elements to make sure we don't exceed the load factor. If we -// need to spill later, this allows UnsafeKVExternalSorter to reuse the array for -// sorting. +// We use two array entries per key, so the array size is twice the capacity. +// We should compare the current capacity of the array, instead of its size. +if (numKeys >= growthThreshold && longArray.size() / 2 < MAX_CAPACITY) { + try { +growAndRehash(); + } catch (OutOfMemoryError oom) { canGrowArray = false; } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new ad61af0 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold ad61af0 is described below commit ad61af01a98487176e7bc15ef31d697c85d863f3 Author: Ankur Dave AuthorDate: Mon Sep 14 13:58:15 2020 -0700 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has ty [...] This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them. Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage. For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`. No. Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes. ```java public abstract class AbstractBytesToBytesMapSuite { // ... Test public void respectGrowthThresholdAtMaxCapacity() { TestMemoryManager memoryManager2 = new TestMemoryManager( new SparkConf() .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true) .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 1024L) .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false) .set(package$.MODULE$.SHUFFLE_COMPRESS(), false)); TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0); final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page marker final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes); try { // Insert keys into the map until it stops accepting new keys. for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) { if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i); final long[] value = new long[]{i}; BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8); Assert.assertFalse(loc.isDefined()); boolean success = loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8); if (!success) break; } // The map should grow to its max capacity. long capacity = map.getArray().size() / 2; Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY); // The map should stop accepting new keys once it has reached its growth // threshold, which is half the max capacity. Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2); map.free(); } finally { map.free(); } } } ``` Closes #29744 from ankurdave/SPARK-32872. Authored-by: Ankur Dave Signed-off-by: Dongjoon Hyun (cherry picked from commit 72550c3be7120fcf2844d6914e883f1bec30d93f) Signed-off-by: Dongjoon Hyun --- .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++-- 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 5ab52cc..4339e92 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -756,12 +756,21 @@ public final class BytesToBytesMap extends MemoryConsumer { longArray.set(pos * 2 + 1, keyHashcode); isDefined = true; -// We use two array entries per key, so the array size is twice the capacity. -// We should compare the current capacity
[spark] branch branch-3.0 updated: [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 990d49a [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold 990d49a is described below commit 990d49a28d6b7d1bd011aa7f1666217bcc920a77 Author: Ankur Dave AuthorDate: Mon Sep 14 13:58:15 2020 -0700 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold ### What changes were proposed in this pull request? When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has ty [...] This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them. ### Why are the changes needed? Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage. For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes. ```java public abstract class AbstractBytesToBytesMapSuite { // ... Test public void respectGrowthThresholdAtMaxCapacity() { TestMemoryManager memoryManager2 = new TestMemoryManager( new SparkConf() .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true) .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 1024L) .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false) .set(package$.MODULE$.SHUFFLE_COMPRESS(), false)); TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0); final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page marker final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes); try { // Insert keys into the map until it stops accepting new keys. for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) { if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i); final long[] value = new long[]{i}; BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8); Assert.assertFalse(loc.isDefined()); boolean success = loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8); if (!success) break; } // The map should grow to its max capacity. long capacity = map.getArray().size() / 2; Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY); // The map should stop accepting new keys once it has reached its growth // threshold, which is half the max capacity. Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2); map.free(); } finally { map.free(); } } } ``` Closes #29744 from ankurdave/SPARK-32872. Authored-by: Ankur Dave Signed-off-by: Dongjoon Hyun (cherry picked from commit 72550c3be7120fcf2844d6914e883f1bec30d93f) Signed-off-by: Dongjoon Hyun --- .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++-- 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 6e02888..4036856 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -738,12 +738,21 @@ public final class BytesToBytesMap extends MemoryConsumer {
[spark] branch master updated (0696f046 -> 72550c3)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 0696f046 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite add 72550c3 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold No new revisions were added by this update. Summary of changes: .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++-- 1 file changed, 15 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 828603d [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite 828603d is described below commit 828603d3c0dbe2ad88a8f55c075b7163868cd6d9 Author: HyukjinKwon AuthorDate: Mon Sep 14 13:54:21 2020 -0700 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite ### What changes were proposed in this pull request? The Jenkins job fails to get the versions. This was fixed by adding temporary fallbacks at https://github.com/apache/spark/pull/28536. This still doesn't work without the temporary fallbacks. See https://github.com/apache/spark/pull/29694 This PR adds new fallbacks since 2.3 is EOL and Spark 3.0.1 and 2.4.7 are released. ### Why are the changes needed? To test correctly in Jenkins. ### Does this PR introduce _any_ user-facing change? No, dev-only ### How was this patch tested? Jenkins and GitHub Actions builds should test. Closes #29748 from HyukjinKwon/SPARK-32876. Authored-by: HyukjinKwon Signed-off-by: Dongjoon Hyun (cherry picked from commit 0696f0467270969f40e9baa829533bdb55f4002a) Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala index aa96fa0..cbfdb7f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -243,7 +243,7 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils { .filter(_ < org.apache.spark.SPARK_VERSION) } catch { // do not throw exception during object initialization. - case NonFatal(_) => Seq("2.3.4", "2.4.5") // A temporary fallback to use a specific version + case NonFatal(_) => Seq("3.0.1", "2.4.7") // A temporary fallback to use a specific version } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (7a17158 -> 0696f046)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 7a17158 [SPARK-32868][SQL] Add more order irrelevant aggregates to EliminateSorts add 0696f046 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new ad61af0 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold ad61af0 is described below commit ad61af01a98487176e7bc15ef31d697c85d863f3 Author: Ankur Dave AuthorDate: Mon Sep 14 13:58:15 2020 -0700 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has ty [...] This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them. Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage. For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`. No. Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes. ```java public abstract class AbstractBytesToBytesMapSuite { // ... Test public void respectGrowthThresholdAtMaxCapacity() { TestMemoryManager memoryManager2 = new TestMemoryManager( new SparkConf() .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true) .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 1024L) .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false) .set(package$.MODULE$.SHUFFLE_COMPRESS(), false)); TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0); final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page marker final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes); try { // Insert keys into the map until it stops accepting new keys. for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) { if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i); final long[] value = new long[]{i}; BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8); Assert.assertFalse(loc.isDefined()); boolean success = loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8); if (!success) break; } // The map should grow to its max capacity. long capacity = map.getArray().size() / 2; Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY); // The map should stop accepting new keys once it has reached its growth // threshold, which is half the max capacity. Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2); map.free(); } finally { map.free(); } } } ``` Closes #29744 from ankurdave/SPARK-32872. Authored-by: Ankur Dave Signed-off-by: Dongjoon Hyun (cherry picked from commit 72550c3be7120fcf2844d6914e883f1bec30d93f) Signed-off-by: Dongjoon Hyun --- .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++-- 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 5ab52cc..4339e92 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -756,12 +756,21 @@ public final class BytesToBytesMap extends MemoryConsumer { longArray.set(pos * 2 + 1, keyHashcode); isDefined = true; -// We use two array entries per key, so the array size is twice the capacity. -// We should compare the current capacity
[spark] branch branch-3.0 updated: [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 990d49a [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold 990d49a is described below commit 990d49a28d6b7d1bd011aa7f1666217bcc920a77 Author: Ankur Dave AuthorDate: Mon Sep 14 13:58:15 2020 -0700 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold ### What changes were proposed in this pull request? When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has ty [...] This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them. ### Why are the changes needed? Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage. For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes. ```java public abstract class AbstractBytesToBytesMapSuite { // ... Test public void respectGrowthThresholdAtMaxCapacity() { TestMemoryManager memoryManager2 = new TestMemoryManager( new SparkConf() .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true) .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 1024L) .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false) .set(package$.MODULE$.SHUFFLE_COMPRESS(), false)); TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0); final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page marker final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes); try { // Insert keys into the map until it stops accepting new keys. for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) { if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i); final long[] value = new long[]{i}; BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8); Assert.assertFalse(loc.isDefined()); boolean success = loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8); if (!success) break; } // The map should grow to its max capacity. long capacity = map.getArray().size() / 2; Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY); // The map should stop accepting new keys once it has reached its growth // threshold, which is half the max capacity. Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2); map.free(); } finally { map.free(); } } } ``` Closes #29744 from ankurdave/SPARK-32872. Authored-by: Ankur Dave Signed-off-by: Dongjoon Hyun (cherry picked from commit 72550c3be7120fcf2844d6914e883f1bec30d93f) Signed-off-by: Dongjoon Hyun --- .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++-- 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 6e02888..4036856 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -738,12 +738,21 @@ public final class BytesToBytesMap extends MemoryConsumer {
[spark] branch master updated (0696f046 -> 72550c3)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 0696f046 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite add 72550c3 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold No new revisions were added by this update. Summary of changes: .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++-- 1 file changed, 15 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 828603d [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite 828603d is described below commit 828603d3c0dbe2ad88a8f55c075b7163868cd6d9 Author: HyukjinKwon AuthorDate: Mon Sep 14 13:54:21 2020 -0700 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite ### What changes were proposed in this pull request? The Jenkins job fails to get the versions. This was fixed by adding temporary fallbacks at https://github.com/apache/spark/pull/28536. This still doesn't work without the temporary fallbacks. See https://github.com/apache/spark/pull/29694 This PR adds new fallbacks since 2.3 is EOL and Spark 3.0.1 and 2.4.7 are released. ### Why are the changes needed? To test correctly in Jenkins. ### Does this PR introduce _any_ user-facing change? No, dev-only ### How was this patch tested? Jenkins and GitHub Actions builds should test. Closes #29748 from HyukjinKwon/SPARK-32876. Authored-by: HyukjinKwon Signed-off-by: Dongjoon Hyun (cherry picked from commit 0696f0467270969f40e9baa829533bdb55f4002a) Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala index aa96fa0..cbfdb7f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -243,7 +243,7 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils { .filter(_ < org.apache.spark.SPARK_VERSION) } catch { // do not throw exception during object initialization. - case NonFatal(_) => Seq("2.3.4", "2.4.5") // A temporary fallback to use a specific version + case NonFatal(_) => Seq("3.0.1", "2.4.7") // A temporary fallback to use a specific version } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (7a17158 -> 0696f046)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 7a17158 [SPARK-32868][SQL] Add more order irrelevant aggregates to EliminateSorts add 0696f046 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new ad61af0 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold ad61af0 is described below commit ad61af01a98487176e7bc15ef31d697c85d863f3 Author: Ankur Dave AuthorDate: Mon Sep 14 13:58:15 2020 -0700 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has ty [...] This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them. Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage. For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`. No. Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes. ```java public abstract class AbstractBytesToBytesMapSuite { // ... Test public void respectGrowthThresholdAtMaxCapacity() { TestMemoryManager memoryManager2 = new TestMemoryManager( new SparkConf() .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true) .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 1024L) .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false) .set(package$.MODULE$.SHUFFLE_COMPRESS(), false)); TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0); final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page marker final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes); try { // Insert keys into the map until it stops accepting new keys. for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) { if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i); final long[] value = new long[]{i}; BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8); Assert.assertFalse(loc.isDefined()); boolean success = loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8); if (!success) break; } // The map should grow to its max capacity. long capacity = map.getArray().size() / 2; Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY); // The map should stop accepting new keys once it has reached its growth // threshold, which is half the max capacity. Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2); map.free(); } finally { map.free(); } } } ``` Closes #29744 from ankurdave/SPARK-32872. Authored-by: Ankur Dave Signed-off-by: Dongjoon Hyun (cherry picked from commit 72550c3be7120fcf2844d6914e883f1bec30d93f) Signed-off-by: Dongjoon Hyun --- .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++-- 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 5ab52cc..4339e92 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -756,12 +756,21 @@ public final class BytesToBytesMap extends MemoryConsumer { longArray.set(pos * 2 + 1, keyHashcode); isDefined = true; -// We use two array entries per key, so the array size is twice the capacity. -// We should compare the current capacity
[spark] branch branch-3.0 updated: [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 990d49a [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold 990d49a is described below commit 990d49a28d6b7d1bd011aa7f1666217bcc920a77 Author: Ankur Dave AuthorDate: Mon Sep 14 13:58:15 2020 -0700 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold ### What changes were proposed in this pull request? When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has ty [...] This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them. ### Why are the changes needed? Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage. For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes. ```java public abstract class AbstractBytesToBytesMapSuite { // ... Test public void respectGrowthThresholdAtMaxCapacity() { TestMemoryManager memoryManager2 = new TestMemoryManager( new SparkConf() .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true) .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 1024L) .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false) .set(package$.MODULE$.SHUFFLE_COMPRESS(), false)); TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0); final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page marker final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes); try { // Insert keys into the map until it stops accepting new keys. for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) { if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i); final long[] value = new long[]{i}; BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8); Assert.assertFalse(loc.isDefined()); boolean success = loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8); if (!success) break; } // The map should grow to its max capacity. long capacity = map.getArray().size() / 2; Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY); // The map should stop accepting new keys once it has reached its growth // threshold, which is half the max capacity. Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2); map.free(); } finally { map.free(); } } } ``` Closes #29744 from ankurdave/SPARK-32872. Authored-by: Ankur Dave Signed-off-by: Dongjoon Hyun (cherry picked from commit 72550c3be7120fcf2844d6914e883f1bec30d93f) Signed-off-by: Dongjoon Hyun --- .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++-- 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 6e02888..4036856 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -738,12 +738,21 @@ public final class BytesToBytesMap extends MemoryConsumer {
[spark] branch master updated (0696f046 -> 72550c3)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 0696f046 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite add 72550c3 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold No new revisions were added by this update. Summary of changes: .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++-- 1 file changed, 15 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 828603d [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite 828603d is described below commit 828603d3c0dbe2ad88a8f55c075b7163868cd6d9 Author: HyukjinKwon AuthorDate: Mon Sep 14 13:54:21 2020 -0700 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite ### What changes were proposed in this pull request? The Jenkins job fails to get the versions. This was fixed by adding temporary fallbacks at https://github.com/apache/spark/pull/28536. This still doesn't work without the temporary fallbacks. See https://github.com/apache/spark/pull/29694 This PR adds new fallbacks since 2.3 is EOL and Spark 3.0.1 and 2.4.7 are released. ### Why are the changes needed? To test correctly in Jenkins. ### Does this PR introduce _any_ user-facing change? No, dev-only ### How was this patch tested? Jenkins and GitHub Actions builds should test. Closes #29748 from HyukjinKwon/SPARK-32876. Authored-by: HyukjinKwon Signed-off-by: Dongjoon Hyun (cherry picked from commit 0696f0467270969f40e9baa829533bdb55f4002a) Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala index aa96fa0..cbfdb7f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -243,7 +243,7 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils { .filter(_ < org.apache.spark.SPARK_VERSION) } catch { // do not throw exception during object initialization. - case NonFatal(_) => Seq("2.3.4", "2.4.5") // A temporary fallback to use a specific version + case NonFatal(_) => Seq("3.0.1", "2.4.7") // A temporary fallback to use a specific version } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (7a17158 -> 0696f046)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 7a17158 [SPARK-32868][SQL] Add more order irrelevant aggregates to EliminateSorts add 0696f046 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 990d49a [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold 990d49a is described below commit 990d49a28d6b7d1bd011aa7f1666217bcc920a77 Author: Ankur Dave AuthorDate: Mon Sep 14 13:58:15 2020 -0700 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold ### What changes were proposed in this pull request? When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has ty [...] This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them. ### Why are the changes needed? Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage. For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes. ```java public abstract class AbstractBytesToBytesMapSuite { // ... Test public void respectGrowthThresholdAtMaxCapacity() { TestMemoryManager memoryManager2 = new TestMemoryManager( new SparkConf() .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true) .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 1024L) .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false) .set(package$.MODULE$.SHUFFLE_COMPRESS(), false)); TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0); final long pageSizeBytes = 800 + 8; // 8 bytes for end-of-page marker final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes); try { // Insert keys into the map until it stops accepting new keys. for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) { if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i); final long[] value = new long[]{i}; BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8); Assert.assertFalse(loc.isDefined()); boolean success = loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8); if (!success) break; } // The map should grow to its max capacity. long capacity = map.getArray().size() / 2; Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY); // The map should stop accepting new keys once it has reached its growth // threshold, which is half the max capacity. Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2); map.free(); } finally { map.free(); } } } ``` Closes #29744 from ankurdave/SPARK-32872. Authored-by: Ankur Dave Signed-off-by: Dongjoon Hyun (cherry picked from commit 72550c3be7120fcf2844d6914e883f1bec30d93f) Signed-off-by: Dongjoon Hyun --- .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++-- 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 6e02888..4036856 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -738,12 +738,21 @@ public final class BytesToBytesMap extends MemoryConsumer {
[spark] branch master updated (0696f046 -> 72550c3)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 0696f046 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite add 72550c3 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold No new revisions were added by this update. Summary of changes: .../apache/spark/unsafe/map/BytesToBytesMap.java| 21 +++-- 1 file changed, 15 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 828603d [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite 828603d is described below commit 828603d3c0dbe2ad88a8f55c075b7163868cd6d9 Author: HyukjinKwon AuthorDate: Mon Sep 14 13:54:21 2020 -0700 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite ### What changes were proposed in this pull request? The Jenkins job fails to get the versions. This was fixed by adding temporary fallbacks at https://github.com/apache/spark/pull/28536. This still doesn't work without the temporary fallbacks. See https://github.com/apache/spark/pull/29694 This PR adds new fallbacks since 2.3 is EOL and Spark 3.0.1 and 2.4.7 are released. ### Why are the changes needed? To test correctly in Jenkins. ### Does this PR introduce _any_ user-facing change? No, dev-only ### How was this patch tested? Jenkins and GitHub Actions builds should test. Closes #29748 from HyukjinKwon/SPARK-32876. Authored-by: HyukjinKwon Signed-off-by: Dongjoon Hyun (cherry picked from commit 0696f0467270969f40e9baa829533bdb55f4002a) Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala index aa96fa0..cbfdb7f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -243,7 +243,7 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils { .filter(_ < org.apache.spark.SPARK_VERSION) } catch { // do not throw exception during object initialization. - case NonFatal(_) => Seq("2.3.4", "2.4.5") // A temporary fallback to use a specific version + case NonFatal(_) => Seq("3.0.1", "2.4.7") // A temporary fallback to use a specific version } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (7a17158 -> 0696f046)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 7a17158 [SPARK-32868][SQL] Add more order irrelevant aggregates to EliminateSorts add 0696f046 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (7a17158 -> 0696f046)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 7a17158 [SPARK-32868][SQL] Add more order irrelevant aggregates to EliminateSorts add 0696f046 [SPARK-32876][SQL] Change default fallback versions to 3.0.1 and 2.4.7 in HiveExternalCatalogVersionsSuite No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (5e82548 -> 7a17158)
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 5e82548 [SPARK-32844][SQL] Make `DataFrameReader.table` take the specified options for datasource v1 add 7a17158 [SPARK-32868][SQL] Add more order irrelevant aggregates to EliminateSorts No new revisions were added by this update. Summary of changes: .../apache/spark/sql/catalyst/dsl/package.scala| 6 + .../spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../catalyst/optimizer/EliminateSortsSuite.scala | 26 -- 3 files changed, 26 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (5e82548 -> 7a17158)
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 5e82548 [SPARK-32844][SQL] Make `DataFrameReader.table` take the specified options for datasource v1 add 7a17158 [SPARK-32868][SQL] Add more order irrelevant aggregates to EliminateSorts No new revisions were added by this update. Summary of changes: .../apache/spark/sql/catalyst/dsl/package.scala| 6 + .../spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../catalyst/optimizer/EliminateSortsSuite.scala | 26 -- 3 files changed, 26 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (5e82548 -> 7a17158)
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 5e82548 [SPARK-32844][SQL] Make `DataFrameReader.table` take the specified options for datasource v1 add 7a17158 [SPARK-32868][SQL] Add more order irrelevant aggregates to EliminateSorts No new revisions were added by this update. Summary of changes: .../apache/spark/sql/catalyst/dsl/package.scala| 6 + .../spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../catalyst/optimizer/EliminateSortsSuite.scala | 26 -- 3 files changed, 26 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (5e82548 -> 7a17158)
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 5e82548 [SPARK-32844][SQL] Make `DataFrameReader.table` take the specified options for datasource v1 add 7a17158 [SPARK-32868][SQL] Add more order irrelevant aggregates to EliminateSorts No new revisions were added by this update. Summary of changes: .../apache/spark/sql/catalyst/dsl/package.scala| 6 + .../spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../catalyst/optimizer/EliminateSortsSuite.scala | 26 -- 3 files changed, 26 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (5e82548 -> 7a17158)
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 5e82548 [SPARK-32844][SQL] Make `DataFrameReader.table` take the specified options for datasource v1 add 7a17158 [SPARK-32868][SQL] Add more order irrelevant aggregates to EliminateSorts No new revisions were added by this update. Summary of changes: .../apache/spark/sql/catalyst/dsl/package.scala| 6 + .../spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../catalyst/optimizer/EliminateSortsSuite.scala | 26 -- 3 files changed, 26 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (978f531 -> 5e82548)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 978f531 [SPARK-32854][SS] Minor code and doc improvement for stream-stream join add 5e82548 [SPARK-32844][SQL] Make `DataFrameReader.table` take the specified options for datasource v1 No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../sql/catalyst/catalog/SessionCatalog.scala | 7 +++-- .../spark/sql/catalyst/catalog/interface.scala | 5 +++- .../org/apache/spark/sql/internal/SQLConf.scala| 12 .../execution/datasources/DataSourceStrategy.scala | 21 +++-- .../execution/datasources/DataSourceUtils.scala| 34 ++ .../sql/test/DataFrameReaderWriterSuite.scala | 22 +- 7 files changed, 89 insertions(+), 14 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (978f531 -> 5e82548)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 978f531 [SPARK-32854][SS] Minor code and doc improvement for stream-stream join add 5e82548 [SPARK-32844][SQL] Make `DataFrameReader.table` take the specified options for datasource v1 No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../sql/catalyst/catalog/SessionCatalog.scala | 7 +++-- .../spark/sql/catalyst/catalog/interface.scala | 5 +++- .../org/apache/spark/sql/internal/SQLConf.scala| 12 .../execution/datasources/DataSourceStrategy.scala | 21 +++-- .../execution/datasources/DataSourceUtils.scala| 34 ++ .../sql/test/DataFrameReaderWriterSuite.scala | 22 +- 7 files changed, 89 insertions(+), 14 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (978f531 -> 5e82548)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 978f531 [SPARK-32854][SS] Minor code and doc improvement for stream-stream join add 5e82548 [SPARK-32844][SQL] Make `DataFrameReader.table` take the specified options for datasource v1 No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../sql/catalyst/catalog/SessionCatalog.scala | 7 +++-- .../spark/sql/catalyst/catalog/interface.scala | 5 +++- .../org/apache/spark/sql/internal/SQLConf.scala| 12 .../execution/datasources/DataSourceStrategy.scala | 21 +++-- .../execution/datasources/DataSourceUtils.scala| 34 ++ .../sql/test/DataFrameReaderWriterSuite.scala | 22 +- 7 files changed, 89 insertions(+), 14 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (978f531 -> 5e82548)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 978f531 [SPARK-32854][SS] Minor code and doc improvement for stream-stream join add 5e82548 [SPARK-32844][SQL] Make `DataFrameReader.table` take the specified options for datasource v1 No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../sql/catalyst/catalog/SessionCatalog.scala | 7 +++-- .../spark/sql/catalyst/catalog/interface.scala | 5 +++- .../org/apache/spark/sql/internal/SQLConf.scala| 12 .../execution/datasources/DataSourceStrategy.scala | 21 +++-- .../execution/datasources/DataSourceUtils.scala| 34 ++ .../sql/test/DataFrameReaderWriterSuite.scala | 22 +- 7 files changed, 89 insertions(+), 14 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (978f531 -> 5e82548)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 978f531 [SPARK-32854][SS] Minor code and doc improvement for stream-stream join add 5e82548 [SPARK-32844][SQL] Make `DataFrameReader.table` take the specified options for datasource v1 No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../sql/catalyst/catalog/SessionCatalog.scala | 7 +++-- .../spark/sql/catalyst/catalog/interface.scala | 5 +++- .../org/apache/spark/sql/internal/SQLConf.scala| 12 .../execution/datasources/DataSourceStrategy.scala | 21 +++-- .../execution/datasources/DataSourceUtils.scala| 34 ++ .../sql/test/DataFrameReaderWriterSuite.scala | 22 +- 7 files changed, 89 insertions(+), 14 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b121f0d -> 978f531)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b121f0d [SPARK-32873][BUILD] Fix code which causes error when build with sbt and Scala 2.13 add 978f531 [SPARK-32854][SS] Minor code and doc improvement for stream-stream join No new revisions were added by this update. Summary of changes: .../streaming/StreamingSymmetricHashJoinExec.scala | 56 -- 1 file changed, 30 insertions(+), 26 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b121f0d -> 978f531)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b121f0d [SPARK-32873][BUILD] Fix code which causes error when build with sbt and Scala 2.13 add 978f531 [SPARK-32854][SS] Minor code and doc improvement for stream-stream join No new revisions were added by this update. Summary of changes: .../streaming/StreamingSymmetricHashJoinExec.scala | 56 -- 1 file changed, 30 insertions(+), 26 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b121f0d -> 978f531)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b121f0d [SPARK-32873][BUILD] Fix code which causes error when build with sbt and Scala 2.13 add 978f531 [SPARK-32854][SS] Minor code and doc improvement for stream-stream join No new revisions were added by this update. Summary of changes: .../streaming/StreamingSymmetricHashJoinExec.scala | 56 -- 1 file changed, 30 insertions(+), 26 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b121f0d -> 978f531)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b121f0d [SPARK-32873][BUILD] Fix code which causes error when build with sbt and Scala 2.13 add 978f531 [SPARK-32854][SS] Minor code and doc improvement for stream-stream join No new revisions were added by this update. Summary of changes: .../streaming/StreamingSymmetricHashJoinExec.scala | 56 -- 1 file changed, 30 insertions(+), 26 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b121f0d -> 978f531)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b121f0d [SPARK-32873][BUILD] Fix code which causes error when build with sbt and Scala 2.13 add 978f531 [SPARK-32854][SS] Minor code and doc improvement for stream-stream join No new revisions were added by this update. Summary of changes: .../streaming/StreamingSymmetricHashJoinExec.scala | 56 -- 1 file changed, 30 insertions(+), 26 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-32708] Query optimization fails to reuse exchange with DataSourceV2
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new d33052b [SPARK-32708] Query optimization fails to reuse exchange with DataSourceV2 d33052b is described below commit d33052b8456818a71724638248cfb91f1aacf1d4 Author: mingjial AuthorDate: Mon Sep 14 15:51:13 2020 +0800 [SPARK-32708] Query optimization fails to reuse exchange with DataSourceV2 ### What changes were proposed in this pull request? Override doCanonicalize function of class DataSourceV2ScanExec ### Why are the changes needed? Query plan of DataSourceV2 fails to reuse any exchange. This change can make DataSourceV2's plan more optimized and reuse exchange as DataSourceV1 and parquet do. Direct reason: equals function of DataSourceV2ScanExec returns 'false' as comparing the same V2 scans(same table, outputs and pushedfilters) Actual cause : With query plan's default doCanonicalize function, pushedfilters of DataSourceV2ScanExec are not canonicalized correctly. Essentially expressionId of predicate columns are not normalized. [Spark 32708](https://issues.apache.org/jira/browse/SPARK-32708#) was not caught by my [tests](https://github.com/apache/spark/blob/5b1b9b39eb612cbf9ec67efd4e364adafcff66c4/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala#L392) previously added for [SPARK-32609] because the above issue happens only when the same filtered column are of different expression id (eg : join table t1 with t1) ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? unit test added Closes #29564 from mingjialiu/branch-2.4. Authored-by: mingjial Signed-off-by: Gengliang Wang --- .../datasources/v2/DataSourceV2ScanExec.scala | 12 +++ .../spark/sql/sources/v2/DataSourceV2Suite.scala | 23 ++ 2 files changed, 35 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index 9b70eec..2fe563a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -22,6 +22,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} @@ -52,6 +53,17 @@ case class DataSourceV2ScanExec( case _ => false } + override def doCanonicalize(): DataSourceV2ScanExec = { +DataSourceV2ScanExec( + output.map(QueryPlan.normalizeExprId(_, output)), + source, + options, + QueryPlan.normalizePredicates( +pushedFilters, +AttributeSeq(pushedFilters.flatMap(_.references).distinct)), + reader) + } + override def hashCode(): Int = { Seq(output, source, options).hashCode() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index ef0a8bd..92e0ac9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -393,6 +393,29 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-32708: same columns with different ExprIds should be equal after canonicalization ") { +def getV2ScanExec(query: DataFrame): DataSourceV2ScanExec = { + query.queryExecution.executedPlan.collect { +case d: DataSourceV2ScanExec => d + }.head +} + +val df1 = spark.read.format(classOf[AdvancedDataSourceV2].getName).load() +val q1 = df1.select('i).filter('i > 6) +val df2 = spark.read.format(classOf[AdvancedDataSourceV2].getName).load() +val q2 = df2.select('i).filter('i > 6) +val scan1 = getV2ScanExec(q1) +val scan2 = getV2ScanExec(q2) +assert(scan1.sameResult(scan2)) +assert(scan1.doCanonicalize().equals(scan2.doCanonicalize())) + +val q3 = df2.select('i).filter('i > 5) +val scan3 = getV2ScanExec(q3) +assert(!scan1.sameResult(scan3)) +assert(!scan1.doCanonicalize().equals(scan3.doCanonicalize())) + } + } class
[spark] branch branch-2.4 updated (4072665 -> d33052b)
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a change to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git. from 4072665 [SPARK-32865][DOC] python section in quickstart page doesn't display SPARK_VERSION correctly add d33052b [SPARK-32708] Query optimization fails to reuse exchange with DataSourceV2 No new revisions were added by this update. Summary of changes: .../datasources/v2/DataSourceV2ScanExec.scala | 12 +++ .../spark/sql/sources/v2/DataSourceV2Suite.scala | 23 ++ 2 files changed, 35 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated (4072665 -> d33052b)
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a change to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git. from 4072665 [SPARK-32865][DOC] python section in quickstart page doesn't display SPARK_VERSION correctly add d33052b [SPARK-32708] Query optimization fails to reuse exchange with DataSourceV2 No new revisions were added by this update. Summary of changes: .../datasources/v2/DataSourceV2ScanExec.scala | 12 +++ .../spark/sql/sources/v2/DataSourceV2Suite.scala | 23 ++ 2 files changed, 35 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated (4072665 -> d33052b)
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a change to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git. from 4072665 [SPARK-32865][DOC] python section in quickstart page doesn't display SPARK_VERSION correctly add d33052b [SPARK-32708] Query optimization fails to reuse exchange with DataSourceV2 No new revisions were added by this update. Summary of changes: .../datasources/v2/DataSourceV2ScanExec.scala | 12 +++ .../spark/sql/sources/v2/DataSourceV2Suite.scala | 23 ++ 2 files changed, 35 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated (4072665 -> d33052b)
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a change to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git. from 4072665 [SPARK-32865][DOC] python section in quickstart page doesn't display SPARK_VERSION correctly add d33052b [SPARK-32708] Query optimization fails to reuse exchange with DataSourceV2 No new revisions were added by this update. Summary of changes: .../datasources/v2/DataSourceV2ScanExec.scala | 12 +++ .../spark/sql/sources/v2/DataSourceV2Suite.scala | 23 ++ 2 files changed, 35 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (742fcff -> b121f0d)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 742fcff [SPARK-32839][WINDOWS] Make Spark scripts working with the spaces in paths on Windows add b121f0d [SPARK-32873][BUILD] Fix code which causes error when build with sbt and Scala 2.13 No new revisions were added by this update. Summary of changes: .../main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala | 4 ++-- mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala | 2 +- mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala | 6 +++--- mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala| 2 +- .../scala/org/apache/spark/sql/execution/command/commands.scala | 4 ++-- .../apache/spark/sql/execution/datasources/v2/V2CommandExec.scala | 2 +- .../spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala | 2 +- .../execution/streaming/state/SymmetricHashJoinStateManager.scala | 4 ++-- .../apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala| 2 +- .../org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala | 2 +- 10 files changed, 15 insertions(+), 15 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (742fcff -> b121f0d)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 742fcff [SPARK-32839][WINDOWS] Make Spark scripts working with the spaces in paths on Windows add b121f0d [SPARK-32873][BUILD] Fix code which causes error when build with sbt and Scala 2.13 No new revisions were added by this update. Summary of changes: .../main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala | 4 ++-- mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala | 2 +- mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala | 6 +++--- mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala| 2 +- .../scala/org/apache/spark/sql/execution/command/commands.scala | 4 ++-- .../apache/spark/sql/execution/datasources/v2/V2CommandExec.scala | 2 +- .../spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala | 2 +- .../execution/streaming/state/SymmetricHashJoinStateManager.scala | 4 ++-- .../apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala| 2 +- .../org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala | 2 +- 10 files changed, 15 insertions(+), 15 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (742fcff -> b121f0d)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 742fcff [SPARK-32839][WINDOWS] Make Spark scripts working with the spaces in paths on Windows add b121f0d [SPARK-32873][BUILD] Fix code which causes error when build with sbt and Scala 2.13 No new revisions were added by this update. Summary of changes: .../main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala | 4 ++-- mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala | 2 +- mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala | 6 +++--- mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala| 2 +- .../scala/org/apache/spark/sql/execution/command/commands.scala | 4 ++-- .../apache/spark/sql/execution/datasources/v2/V2CommandExec.scala | 2 +- .../spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala | 2 +- .../execution/streaming/state/SymmetricHashJoinStateManager.scala | 4 ++-- .../apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala| 2 +- .../org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala | 2 +- 10 files changed, 15 insertions(+), 15 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (742fcff -> b121f0d)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 742fcff [SPARK-32839][WINDOWS] Make Spark scripts working with the spaces in paths on Windows add b121f0d [SPARK-32873][BUILD] Fix code which causes error when build with sbt and Scala 2.13 No new revisions were added by this update. Summary of changes: .../main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala | 4 ++-- mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala | 2 +- mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala | 6 +++--- mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala| 2 +- .../scala/org/apache/spark/sql/execution/command/commands.scala | 4 ++-- .../apache/spark/sql/execution/datasources/v2/V2CommandExec.scala | 2 +- .../spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala | 2 +- .../execution/streaming/state/SymmetricHashJoinStateManager.scala | 4 ++-- .../apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala| 2 +- .../org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala | 2 +- 10 files changed, 15 insertions(+), 15 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (742fcff -> b121f0d)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 742fcff [SPARK-32839][WINDOWS] Make Spark scripts working with the spaces in paths on Windows add b121f0d [SPARK-32873][BUILD] Fix code which causes error when build with sbt and Scala 2.13 No new revisions were added by this update. Summary of changes: .../main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala | 4 ++-- mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala | 2 +- mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala | 6 +++--- mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala| 2 +- .../scala/org/apache/spark/sql/execution/command/commands.scala | 4 ++-- .../apache/spark/sql/execution/datasources/v2/V2CommandExec.scala | 2 +- .../spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala | 2 +- .../execution/streaming/state/SymmetricHashJoinStateManager.scala | 4 ++-- .../apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala| 2 +- .../org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala | 2 +- 10 files changed, 15 insertions(+), 15 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org