This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 4e4b940 [Improvement] Add RssUtils#cloneBitMap() (#103)
4e4b940 is described below
commit 4e4b9400940fa4ed53af370f7bb54d85861ddbcc
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Jul 29 16:49:55 2022 +0800
[Improvement] Add RssUtils#cloneBitMap() (#103)
### What changes were proposed in this pull request?
1. Add `RssUtils#cloneBitMap()`.
2. Replace `deserializeBitMap(serializeBitMap(bitmap))` by
`cloneBitMap(bitmap)`.
### Why are the changes needed?
1. No need to handle `IOException`.
2. More efficient.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New unit test `RssUtilsTest#testCloneBitmap()`
---
.../apache/uniffle/client/impl/ShuffleReadClientImpl.java | 13 ++-----------
.../main/java/org/apache/uniffle/common/util/RssUtils.java | 6 ++++++
.../java/org/apache/uniffle/common/util/RssUtilsTest.java | 9 +++++++++
.../org/apache/uniffle/test/SparkClientWithLocalTest.java | 2 +-
.../java/org/apache/uniffle/server/ShuffleTaskManager.java | 7 ++-----
5 files changed, 20 insertions(+), 17 deletions(-)
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index fc8b4d3..5059f87 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -17,7 +17,6 @@
package org.apache.uniffle.client.impl;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Queue;
@@ -110,11 +109,7 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
}
// copy blockIdBitmap to track all pending blocks
- try {
- pendingBlockIds =
RssUtils.deserializeBitMap(RssUtils.serializeBitMap(blockIdBitmap));
- } catch (IOException ioe) {
- throw new RuntimeException("Can't create pending blockIds.", ioe);
- }
+ pendingBlockIds = RssUtils.cloneBitMap(blockIdBitmap);
clientReadHandler =
ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
}
@@ -213,11 +208,7 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
@Override
public void checkProcessedBlockIds() {
Roaring64NavigableMap cloneBitmap;
- try {
- cloneBitmap =
RssUtils.deserializeBitMap(RssUtils.serializeBitMap(blockIdBitmap));
- } catch (IOException ioe) {
- throw new RuntimeException("Can't validate processed blockIds.", ioe);
- }
+ cloneBitmap = RssUtils.cloneBitMap(blockIdBitmap);
cloneBitmap.and(processedBlockIds);
if (!blockIdBitmap.equals(cloneBitmap)) {
throw new RssException("Blocks read inconsistent: expected " +
blockIdBitmap.getLongCardinality()
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 2219aba..8b7f500 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -172,6 +172,12 @@ public class RssUtils {
return bitmap;
}
+ public static Roaring64NavigableMap cloneBitMap(Roaring64NavigableMap
bitmap) {
+ Roaring64NavigableMap clone = Roaring64NavigableMap.bitmapOf();
+ clone.or(bitmap);
+ return clone;
+ }
+
public static List<ShuffleDataSegment> transIndexDataToSegments(
ShuffleIndexResult shuffleIndexResult, int readBufferSize) {
if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) {
diff --git
a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index 6d8292e..3c946eb 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -36,6 +36,7 @@ import org.apache.uniffle.common.ShuffleIndexResult;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -88,6 +89,14 @@ public class RssUtilsTest {
assertEquals(Roaring64NavigableMap.bitmapOf(),
RssUtils.deserializeBitMap(new byte[]{}));
}
+ @Test
+ public void testCloneBitmap() {
+ Roaring64NavigableMap bitmap1 = Roaring64NavigableMap.bitmapOf(1, 2, 100,
10000);
+ Roaring64NavigableMap bitmap2 = RssUtils.cloneBitMap(bitmap1);
+ assertNotSame(bitmap1, bitmap2);
+ assertEquals(bitmap1, bitmap2);
+ }
+
@Test
public void testShuffleIndexSegment() {
ShuffleIndexResult shuffleIndexResult = new ShuffleIndexResult();
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
index b0e68d2..4c4e190 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
@@ -318,7 +318,7 @@ public class SparkClientWithLocalTest extends
ShuffleReadWriteBase {
ShuffleReadClientImpl readClient;
createTestData(testAppId, expectedData, blockIdBitmap, taskIdBitmap);
- Roaring64NavigableMap beforeAdded =
RssUtils.deserializeBitMap(RssUtils.serializeBitMap(blockIdBitmap));
+ Roaring64NavigableMap beforeAdded = RssUtils.cloneBitMap(blockIdBitmap);
// write data by another task, read data again, the cache for index file
should be updated
blocks = createShuffleBlockList(
0, 0, 1, 3, 25, blockIdBitmap, Maps.newHashMap(), mockSSI);
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 0876c51..3243c2d 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -170,11 +170,9 @@ public class ShuffleTaskManager {
if (System.currentTimeMillis() - start > commitTimeout) {
throw new RuntimeException("Shuffle data commit timeout for " +
commitTimeout + " ms");
}
- byte[] bitmapBytes;
synchronized (cachedBlockIds) {
- bitmapBytes = RssUtils.serializeBitMap(cachedBlockIds);
+ cloneBlockIds = RssUtils.cloneBitMap(cachedBlockIds);
}
- cloneBlockIds = RssUtils.deserializeBitMap(bitmapBytes);
long expectedCommitted = cloneBlockIds.getLongCardinality();
shuffleBufferManager.commitShuffleTask(appId, shuffleId);
Roaring64NavigableMap committedBlockIds;
@@ -183,9 +181,8 @@ public class ShuffleTaskManager {
while (true) {
committedBlockIds = shuffleFlushManager.getCommittedBlockIds(appId,
shuffleId);
synchronized (committedBlockIds) {
- bitmapBytes = RssUtils.serializeBitMap(committedBlockIds);
+ cloneCommittedBlockIds = RssUtils.cloneBitMap(committedBlockIds);
}
- cloneCommittedBlockIds = RssUtils.deserializeBitMap(bitmapBytes);
cloneBlockIds.andNot(cloneCommittedBlockIds);
if (cloneBlockIds.isEmpty()) {
break;