This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 45e600ca Revert "[Improvement] Skip blocks when read from memory
(#294)" (#403)
45e600ca is described below
commit 45e600ca951d26a1c3655c873f8af7a9423cebf2
Author: xianjingfeng <[email protected]>
AuthorDate: Mon Dec 12 22:15:24 2022 +0800
Revert "[Improvement] Skip blocks when read from memory (#294)" (#403)
This reverts commit 55191c43cc94dc0e72ca378e3c65b743bf66bbb0.
### What changes were proposed in this pull request?
Revert #294
### Why are the changes needed?
BlockId is discontinuous, so BLOCKID_RANGE is not a good choice to filter
memory data
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No need
---
.../org/apache/hadoop/mapreduce/RssMRConfig.java | 10 ---
.../hadoop/mapreduce/task/reduce/RssShuffle.java | 11 +--
.../org/apache/spark/shuffle/RssSparkConfig.java | 6 --
.../spark/shuffle/reader/RssShuffleReader.java | 15 +---
.../spark/shuffle/reader/RssShuffleReader.java | 21 ++---
.../client/factory/ShuffleClientFactory.java | 3 +-
.../uniffle/client/impl/ShuffleReadClientImpl.java | 11 +--
.../request/CreateShuffleReadClientRequest.java | 38 +++------
.../uniffle/client/util/RssClientConfig.java | 9 +--
.../apache/uniffle/common/BlockSkipStrategy.java | 24 ------
.../org/apache/uniffle/common/util/RssUtils.java | 92 ---------------------
.../apache/uniffle/common/util/RssUtilsTest.java | 94 ----------------------
.../uniffle/test/ShuffleServerWithMemoryTest.java | 62 --------------
...SkewedJoinWithBlockIdRangeSkipStrategyTest.java | 32 --------
.../client/impl/grpc/ShuffleServerGrpcClient.java | 1 -
.../request/RssGetInMemoryShuffleDataRequest.java | 11 +--
proto/src/main/proto/Rss.proto | 1 -
.../uniffle/server/ShuffleServerGrpcService.java | 4 +-
.../apache/uniffle/server/ShuffleTaskManager.java | 4 +-
.../uniffle/server/buffer/ShuffleBuffer.java | 42 +++-------
.../server/buffer/ShuffleBufferManager.java | 6 +-
.../server/buffer/ShuffleBufferManagerTest.java | 33 ++------
.../uniffle/server/buffer/ShuffleBufferTest.java | 79 ++++++++----------
.../storage/factory/ShuffleHandlerFactory.java | 5 +-
.../handler/impl/MemoryClientReadHandler.java | 35 ++------
.../request/CreateShuffleReadHandlerRequest.java | 20 ++---
26 files changed, 90 insertions(+), 579 deletions(-)
diff --git
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
index f37c6e06..f9feb85f 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
@@ -179,16 +179,6 @@ public class RssMRConfig {
public static final int
RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE =
RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE;
- public static final String RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY =
- MR_RSS_CONFIG_PREFIX +
RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY;
- public static final String RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY_DEFAULT_VALUE
=
- RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY_DEFAULT_VALUE;
-
- public static final String RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS =
- MR_RSS_CONFIG_PREFIX +
RssClientConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS;
- public static final int
RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS_DEFAULT_VALUE =
- RssClientConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS_DEFAULT_VALUE;
-
public static final String RSS_CONF_FILE = "rss_conf.xml";
public static final Set<String> RSS_MANDATORY_CLUSTER_CONF =
diff --git
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
index 32c14b06..e5af9795 100644
---
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
+++
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
@@ -42,7 +42,6 @@ import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
-import org.apache.uniffle.common.BlockSkipStrategy;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.util.UnitConverter;
@@ -83,8 +82,6 @@ public class RssShuffle<K, V> implements
ShuffleConsumerPlugin<K, V>, ExceptionR
private int readBufferSize;
private RemoteStorageInfo remoteStorageInfo;
private int appAttemptId;
- private BlockSkipStrategy blockSkipStrategy;
- private int maxBlockIdRangeSegments;
@Override
public void init(ShuffleConsumerPlugin.Context context) {
@@ -112,12 +109,6 @@ public class RssShuffle<K, V> implements
ShuffleConsumerPlugin<K, V>, ExceptionR
RssMRConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
this.replica = RssMRUtils.getInt(rssJobConf, mrJobConf,
RssMRConfig.RSS_DATA_REPLICA,
RssMRConfig.RSS_DATA_REPLICA_DEFAULT_VALUE);
- blockSkipStrategy =
BlockSkipStrategy.valueOf(RssMRUtils.getString(rssJobConf, mrJobConf,
- RssMRConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY,
- RssMRConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY_DEFAULT_VALUE));
- maxBlockIdRangeSegments = RssMRUtils.getInt(rssJobConf, mrJobConf,
- RssMRConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS,
- RssMRConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS_DEFAULT_VALUE);
this.partitionNum = mrJobConf.getNumReduceTasks();
this.partitionNumPerRange = RssMRUtils.getInt(rssJobConf, mrJobConf,
RssMRConfig.RSS_PARTITION_NUM_PER_RANGE,
@@ -203,7 +194,7 @@ public class RssShuffle<K, V> implements
ShuffleConsumerPlugin<K, V>, ExceptionR
CreateShuffleReadClientRequest request = new
CreateShuffleReadClientRequest(
appId, 0, reduceId.getTaskID().getId(), storageType, basePath,
indexReadLimit, readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap,
serverInfoList,
- readerJobConf, new MRIdHelper(), blockSkipStrategy,
maxBlockIdRangeSegments);
+ readerJobConf, new MRIdHelper());
ShuffleReadClient shuffleReadClient =
ShuffleClientFactory.getInstance().createShuffleReadClient(request);
RssFetcher fetcher = new RssFetcher(mrJobConf, reduceId, taskStatus,
merger, copyPhase, reporter, metrics,
shuffleReadClient, blockIdBitmap.getLongCardinality(),
RssMRConfig.toRssConf(rssJobConf));
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index 958a05df..9da401d2 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -265,12 +265,6 @@ public class RssSparkConfig {
+ " spark.rss.estimate.server.assignment.enabled"))
.createWithDefault(RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE);
- public static final ConfigEntry<String> RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY
= createStringBuilder(
- new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX +
RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY)
- .doc("The strategy for skip block when read from memory."))
-
.createWithDefault(RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY_DEFAULT_VALUE);
-
-
public static final Set<String> RSS_MANDATORY_CLUSTER_CONF =
ImmutableSet.of(RSS_STORAGE_TYPE.key(), RSS_REMOTE_STORAGE_PATH.key());
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index b08d01bf..cc6fe254 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -45,8 +45,6 @@ import scala.runtime.BoxedUnit;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
-import org.apache.uniffle.client.util.RssClientConfig;
-import org.apache.uniffle.common.BlockSkipStrategy;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
@@ -73,8 +71,6 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
private List<ShuffleServerInfo> shuffleServerInfoList;
private Configuration hadoopConf;
private RssConf rssConf;
- private final BlockSkipStrategy blockSkipStrategy;
- private final int maxBlockIdRangeSegments;
public RssShuffleReader(
int startPartition,
@@ -111,14 +107,6 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
this.shuffleServerInfoList =
(List<ShuffleServerInfo>)
(rssShuffleHandle.getPartitionToServers().get(startPartition));
this.rssConf = rssConf;
-
- BlockSkipStrategy blockSkipStrategy = BlockSkipStrategy.valueOf(
- rssConf.getString(RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY,
-
RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY_DEFAULT_VALUE));
- this.blockSkipStrategy = shuffleServerInfoList.size() <= 1 ?
BlockSkipStrategy.NONE : blockSkipStrategy;
-
- maxBlockIdRangeSegments =
rssConf.getInteger(RssClientConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS,
-
RssClientConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS_DEFAULT_VALUE);
}
@Override
@@ -127,8 +115,7 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
CreateShuffleReadClientRequest request = new
CreateShuffleReadClientRequest(
appId, shuffleId, startPartition, storageType, basePath,
indexReadLimit, readBufferSize,
- partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap,
shuffleServerInfoList, hadoopConf,
- blockSkipStrategy, maxBlockIdRangeSegments);
+ partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap,
shuffleServerInfoList, hadoopConf);
ShuffleReadClient shuffleReadClient =
ShuffleClientFactory.getInstance().createShuffleReadClient(request);
RssShuffleDataIterator rssShuffleDataIterator = new
RssShuffleDataIterator<K, C>(
shuffleDependency.serializer(), shuffleReadClient,
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 31101262..89e17664 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -49,8 +49,6 @@ import scala.runtime.BoxedUnit;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
-import org.apache.uniffle.client.util.RssClientConfig;
-import org.apache.uniffle.common.BlockSkipStrategy;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
@@ -58,6 +56,7 @@ import org.apache.uniffle.common.config.RssConf;
public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
private static final Logger LOG =
LoggerFactory.getLogger(RssShuffleReader.class);
private final Map<Integer, List<ShuffleServerInfo>>
partitionToShuffleServers;
+
private String appId;
private int shuffleId;
private int startPartition;
@@ -79,8 +78,7 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
private ShuffleReadMetrics readMetrics;
private RssConf rssConf;
private ShuffleDataDistributionType dataDistributionType;
- private final BlockSkipStrategy blockSkipStrategy;
- private final int maxBlockIdRangeSegments;
+ private boolean expectedTaskIdsBitmapFilterEnable;
public RssShuffleReader(
int startPartition,
@@ -122,11 +120,9 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
this.partitionToShuffleServers = rssShuffleHandle.getPartitionToServers();
this.rssConf = rssConf;
this.dataDistributionType = dataDistributionType;
- blockSkipStrategy = BlockSkipStrategy.valueOf(
- rssConf.getString(RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY,
-
RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY_DEFAULT_VALUE));
- maxBlockIdRangeSegments =
rssConf.getInteger(RssClientConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS,
-
RssClientConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS_DEFAULT_VALUE);
+ // This mechanism of expectedTaskIdsBitmap filter is to filter out the
most of data.
+ // especially for AQE skew optimization
+ this.expectedTaskIdsBitmapFilterEnable = !(mapStartIndex == 0 &&
mapEndIndex == Integer.MAX_VALUE);
}
@Override
@@ -211,15 +207,10 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
continue;
}
List<ShuffleServerInfo> shuffleServerInfoList =
partitionToShuffleServers.get(partition);
- // If AQE is disable and the number of replica is 1, we should set
BlockSkipStrategy to NONE
- // for reduce data transmission
- BlockSkipStrategy realBlockSkipStrategy = shuffleServerInfoList.size()
<= 1
- && mapStartIndex == 0 && mapEndIndex == Integer.MAX_VALUE
- ? BlockSkipStrategy.NONE : blockSkipStrategy;
CreateShuffleReadClientRequest request = new
CreateShuffleReadClientRequest(
appId, shuffleId, partition, storageType, basePath,
indexReadLimit, readBufferSize,
1, partitionNum, partitionToExpectBlocks.get(partition),
taskIdBitmap, shuffleServerInfoList,
- hadoopConf, dataDistributionType, realBlockSkipStrategy,
maxBlockIdRangeSegments);
+ hadoopConf, dataDistributionType,
expectedTaskIdsBitmapFilterEnable);
ShuffleReadClient shuffleReadClient =
ShuffleClientFactory.getInstance().createShuffleReadClient(request);
RssShuffleDataIterator iterator = new RssShuffleDataIterator<K, C>(
shuffleDependency.serializer(), shuffleReadClient,
diff --git
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
index caaa14c5..39d9ded0 100644
---
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
+++
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
@@ -87,8 +87,7 @@ public class ShuffleClientFactory {
request.getHadoopConf(),
request.getIdHelper(),
request.getShuffleDataDistributionType(),
- request.getBlockSkipStrategy(),
- request.getMaxBlockIdRangeSegments()
+ request.isExpectedTaskIdsBitmapFilterEnable()
);
}
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index 333fe527..84758c7b 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.client.util.IdHelper;
-import org.apache.uniffle.common.BlockSkipStrategy;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
@@ -79,8 +78,7 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
Configuration hadoopConf,
IdHelper idHelper,
ShuffleDataDistributionType dataDistributionType,
- BlockSkipStrategy blockSkipStrategy,
- int maxBlockIdRangeSegments) {
+ boolean expectedTaskIdsBitmapFilterEnable) {
this.shuffleId = shuffleId;
this.partitionId = partitionId;
this.blockIdBitmap = blockIdBitmap;
@@ -104,10 +102,9 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
request.setProcessBlockIds(processedBlockIds);
request.setDistributionType(dataDistributionType);
request.setExpectTaskIds(taskIdBitmap);
- if (BlockSkipStrategy.BLOCKID_RANGE.equals(blockSkipStrategy)) {
- request.setMaxBlockIdRangeSegments(maxBlockIdRangeSegments);
+ if (expectedTaskIdsBitmapFilterEnable) {
+ request.useExpectedTaskIdsBitmapFilter();
}
- request.setBlockSkipStrategy(blockSkipStrategy);
List<Long> removeBlockIds = Lists.newArrayList();
blockIdBitmap.forEach(bid -> {
@@ -144,7 +141,7 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
this(storageType, appId, shuffleId, partitionId, indexReadLimit,
partitionNumPerRange, partitionNum, readBufferSize, storageBasePath,
blockIdBitmap, taskIdBitmap, shuffleServerInfoList, hadoopConf,
- idHelper, ShuffleDataDistributionType.NORMAL,
BlockSkipStrategy.TASK_BITMAP, 0);
+ idHelper, ShuffleDataDistributionType.NORMAL, false);
}
@Override
diff --git
a/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
b/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
index 3362245b..db050304 100644
---
a/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
+++
b/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
@@ -24,7 +24,6 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.client.util.IdHelper;
-import org.apache.uniffle.common.BlockSkipStrategy;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
@@ -45,8 +44,7 @@ public class CreateShuffleReadClientRequest {
private Configuration hadoopConf;
private IdHelper idHelper;
private ShuffleDataDistributionType shuffleDataDistributionType =
ShuffleDataDistributionType.NORMAL;
- private BlockSkipStrategy blockSkipStrategy;
- private int maxBlockIdRangeSegments;
+ private boolean expectedTaskIdsBitmapFilterEnable = false;
public CreateShuffleReadClientRequest(
String appId,
@@ -63,12 +61,12 @@ public class CreateShuffleReadClientRequest {
List<ShuffleServerInfo> shuffleServerInfoList,
Configuration hadoopConf,
ShuffleDataDistributionType dataDistributionType,
- BlockSkipStrategy blockSkipStrategy,
- int maxBlockIdRangeSegments) {
+ boolean expectedTaskIdsBitmapFilterEnable) {
this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit,
readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap,
shuffleServerInfoList,
- hadoopConf, new DefaultIdHelper(), blockSkipStrategy,
maxBlockIdRangeSegments);
+ hadoopConf, new DefaultIdHelper());
this.shuffleDataDistributionType = dataDistributionType;
+ this.expectedTaskIdsBitmapFilterEnable = expectedTaskIdsBitmapFilterEnable;
}
public CreateShuffleReadClientRequest(
@@ -84,12 +82,10 @@ public class CreateShuffleReadClientRequest {
Roaring64NavigableMap blockIdBitmap,
Roaring64NavigableMap taskIdBitmap,
List<ShuffleServerInfo> shuffleServerInfoList,
- Configuration hadoopConf,
- BlockSkipStrategy blockSkipStrategy,
- int maxBlockIdRangeSegments) {
+ Configuration hadoopConf) {
this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit,
readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap,
shuffleServerInfoList,
- hadoopConf, new DefaultIdHelper(), blockSkipStrategy,
maxBlockIdRangeSegments);
+ hadoopConf, new DefaultIdHelper());
}
public CreateShuffleReadClientRequest(
@@ -106,9 +102,7 @@ public class CreateShuffleReadClientRequest {
Roaring64NavigableMap taskIdBitmap,
List<ShuffleServerInfo> shuffleServerInfoList,
Configuration hadoopConf,
- IdHelper idHelper,
- BlockSkipStrategy blockSkipStrategy,
- int maxBlockIdRangeSegments) {
+ IdHelper idHelper) {
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionId = partitionId;
@@ -123,8 +117,6 @@ public class CreateShuffleReadClientRequest {
this.shuffleServerInfoList = shuffleServerInfoList;
this.hadoopConf = hadoopConf;
this.idHelper = idHelper;
- this.blockSkipStrategy = blockSkipStrategy;
- this.maxBlockIdRangeSegments = maxBlockIdRangeSegments;
}
public String getAppId() {
@@ -187,19 +179,7 @@ public class CreateShuffleReadClientRequest {
return shuffleDataDistributionType;
}
- public int getMaxBlockIdRangeSegments() {
- return maxBlockIdRangeSegments;
- }
-
- public void setMaxBlockIdRangeSegments(int maxBlockIdRangeSegments) {
- this.maxBlockIdRangeSegments = maxBlockIdRangeSegments;
- }
-
- public BlockSkipStrategy getBlockSkipStrategy() {
- return blockSkipStrategy;
- }
-
- public void setBlockSkipStrategy(BlockSkipStrategy blockSkipStrategy) {
- this.blockSkipStrategy = blockSkipStrategy;
+ public boolean isExpectedTaskIdsBitmapFilterEnable() {
+ return expectedTaskIdsBitmapFilterEnable;
}
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
index fa8375a2..5ab3a6fb 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
@@ -61,7 +61,7 @@ public class RssClientConfig {
// The tags specified by rss client to determine server assignment.
public static final String RSS_CLIENT_ASSIGNMENT_TAGS =
"rss.client.assignment.tags";
public static final String RSS_TEST_MODE_ENABLE = "rss.test.mode.enable";
-
+
public static final String RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL =
"rss.client.assignment.retry.interval";
public static final long RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE
= 65000;
public static final String RSS_CLIENT_ASSIGNMENT_RETRY_TIMES =
"rss.client.assignment.retry.times";
@@ -86,11 +86,4 @@ public class RssClientConfig {
public static final String RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER =
"rss.estimate.task.concurrency.per.server";
public static final int
RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE = 80;
- public static final String RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY =
"rss.client.read.block.skip.strategy";
-
- public static final String RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY_DEFAULT_VALUE
= "TASK_BITMAP";
-
- public static final String RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS =
- "rss.client.read.block.skip.range.segments.max";
- public static final int
RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS_DEFAULT_VALUE = 10;
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/BlockSkipStrategy.java
b/common/src/main/java/org/apache/uniffle/common/BlockSkipStrategy.java
deleted file mode 100644
index a07ad4e6..00000000
--- a/common/src/main/java/org/apache/uniffle/common/BlockSkipStrategy.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.common;
-
-public enum BlockSkipStrategy {
- NONE,
- TASK_BITMAP,
- BLOCKID_RANGE
-}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index fb4d8bf5..fb05894e 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -31,8 +31,6 @@ import java.net.InetAddress;
import java.net.InterfaceAddress;
import java.net.NetworkInterface;
import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
@@ -40,13 +38,11 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import java.util.stream.Collectors;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.net.InetAddresses;
-import org.apache.commons.lang3.tuple.Pair;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -287,92 +283,4 @@ public class RssUtils {
+ " blocks, actual " + cloneBitmap.getLongCardinality() + " blocks");
}
}
-
- /**
- * Generate range segments for blockId bitmap
- * @param blockIdBitmap blockId bitmap
- * @param maxSegments Maximum number of segments to return
- * @return Range segments.like [start1, end1, start2, end2]
- */
- public static List<Long> generateRangeSegments(Roaring64NavigableMap
blockIdBitmap, int maxSegments) {
- Iterator<Long> iterator = blockIdBitmap.iterator();
- if (!iterator.hasNext()) {
- return Collections.EMPTY_LIST;
- }
- List<Long> endPoints = Lists.newArrayList();
- long lastId = iterator.next();
- endPoints.add(lastId);
- while (iterator.hasNext()) {
- long blockId = iterator.next();
- try {
- if (blockId - lastId <= 1) {
- continue;
- }
- endPoints.add(lastId);
- endPoints.add(blockId);
- } finally {
- lastId = blockId;
- }
- }
- if (endPoints.size() % 2 != 0) {
- endPoints.add(lastId);
- }
- return mergeRangeSegments(endPoints, maxSegments);
- }
-
- /**
- * Merge range segments
- * @param endPoints EndPoints of all segments.
- * @param maxSegments Maximum number of segments to return
- * @return Merged segments
- */
- public static List<Long> mergeRangeSegments(List<Long> endPoints, int
maxSegments) {
- int maxPoints = maxSegments * 2;
- if (endPoints.size() < 2 || endPoints.size() <= maxPoints) {
- return endPoints;
- }
- // distance of two segments -> the offset of the second segment
- List<Pair<Long, Integer>> distinces = Lists.newArrayList();
- for (int i = 2; i < endPoints.size(); i += 2) {
- long distance = endPoints.get(i) - endPoints.get(i - 1);
- distinces.add(Pair.of(distance, i));
- }
- distinces.sort(Comparator.comparingLong(Pair::getLeft));
-
- int mergeSegmentNum = endPoints.size() / 2 - maxSegments;
- // Find the nearest segments top N(mergeSegmentNum)
- List<Integer> indexsToMerge = distinces.stream().limit(mergeSegmentNum)
- .map(e ->
e.getValue()).sorted(Comparator.reverseOrder()).collect(Collectors.toList());
- // Merge segments
- for (Integer index : indexsToMerge) {
- // Don't remove (int), or remove(Object o) will be invoked;
- endPoints.remove((int)index);
- endPoints.remove(index - 1);
- }
- return endPoints;
- }
-
- /**
- * Check if the gived blockId inside the range segments. Use binary search
- * @param rangeSegments Range segments.like [start1, end1, start2, end2]
- * @param blockId BlockId to check
- * @return true if blockId inside the range segments
- */
- public static boolean checkIfBlockInRange(List<Long> rangeSegments, Long
blockId) {
- int lower = 0;
- int upper = rangeSegments.size() - 1;
- Comparator<Long> comparator = Comparator.naturalOrder();
- while (lower <= upper) {
- int middle = (lower + upper) >>> 1;
- int c = comparator.compare(blockId, rangeSegments.get(middle));
- if (c < 0) {
- upper = middle - 1;
- } else if (c > 0) {
- lower = middle + 1;
- } else {
- return true;
- }
- }
- return lower % 2 != 0;
- }
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index 36422d89..a84e3902 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -21,7 +21,6 @@ import java.lang.reflect.Field;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -39,7 +38,6 @@ import org.apache.uniffle.common.ShuffleServerInfo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -178,98 +176,6 @@ public class RssUtilsTest {
assertEquals(serverToPartitions.get(server4), Sets.newHashSet(2, 4));
}
- @Test
- public void testMergeRangeSegments() {
- List<Long> endPoints = Lists.newArrayList(1L, 2L, 5L, 6L);
- List<Long> results = RssUtils.mergeRangeSegments(endPoints, 1);
- ArrayList<Long> expectResults = Lists.newArrayList(1L, 6L);
- assertIterableEquals(results, expectResults);
-
- endPoints = Lists.newArrayList(1L, 2L, 5L, 6L, 8L, 10L);
- results = RssUtils.mergeRangeSegments(endPoints, 2);
- expectResults = Lists.newArrayList(1L, 2L, 5L, 10L);
- assertIterableEquals(results, expectResults);
-
- endPoints = Lists.newArrayList(1L, 2L, 5L, 6L, 10L, 12L);
- results = RssUtils.mergeRangeSegments(endPoints, 2);
- expectResults = Lists.newArrayList(1L, 6L, 10L, 12L);
- assertIterableEquals(results, expectResults);
-
- endPoints = Lists.newArrayList(1L, 1L, 3L, 3L, 5L, 5L);
- results = RssUtils.mergeRangeSegments(endPoints, 2);
- expectResults = Lists.newArrayList(1L, 3L, 5L, 5L);
- assertIterableEquals(results, expectResults);
- }
-
- @Test
- public void testGenerateRangeSegments() {
- Random random = new Random();
- Roaring64NavigableMap bitmap = Roaring64NavigableMap.bitmapOf();
- for (int i = 0; i < 1000; i++) {
- if (random.nextInt(10) < 3) {
- continue;
- }
- bitmap.add(i);
- }
- int maxSegments = 3;
- List<Long> segments = RssUtils.generateRangeSegments(bitmap, 3);
- assertEquals(maxSegments * 2, segments.size());
-
- List<Long> endPoints = Lists.newArrayList(1L, 2L, 5L, 6L);
- bitmap = Roaring64NavigableMap.bitmapOf();
- for (Long endPoint : endPoints) {
- bitmap.add(endPoint);
- }
- List<Long> results = RssUtils.generateRangeSegments(bitmap, 100);
- List<Long> expectResults = Lists.newArrayList(1L, 2L, 5L, 6L);
- assertIterableEquals(results, expectResults);
-
- bitmap = Roaring64NavigableMap.bitmapOf();
- endPoints = Lists.newArrayList(1L, 2L, 5L, 6L, 8L, 9L, 10L);
- for (Long endPoint : endPoints) {
- bitmap.add(endPoint);
- }
- results = RssUtils.generateRangeSegments(bitmap, 100);
- expectResults = Lists.newArrayList(1L, 2L, 5L, 6L, 8L, 10L);
- assertIterableEquals(results, expectResults);
-
- bitmap = Roaring64NavigableMap.bitmapOf();
- endPoints = Lists.newArrayList(1L, 2L, 5L, 6L, 10L, 11L, 12L);
- for (Long endPoint : endPoints) {
- bitmap.add(endPoint);
- }
- results = RssUtils.generateRangeSegments(bitmap, 100);
- expectResults = Lists.newArrayList(1L, 2L, 5L, 6L, 10L, 12L);
- assertIterableEquals(results, expectResults);
-
- bitmap = Roaring64NavigableMap.bitmapOf();
- endPoints = Lists.newArrayList(1L, 1L, 3L, 3L, 5L, 5L);
- for (Long endPoint : endPoints) {
- bitmap.add(endPoint);
- }
- results = RssUtils.generateRangeSegments(bitmap, 100);
- expectResults = Lists.newArrayList(1L, 1L, 3L, 3L, 5L, 5L);
- assertIterableEquals(results, expectResults);
- }
-
- @Test
- public void testCheckIfBlockInRange() {
- List<Long> rangeSegments = Lists.newArrayList(1L, 2L, 5L, 6L, 10L, 12L);
- Roaring64NavigableMap bitmap = Roaring64NavigableMap.bitmapOf();
- for (Long element : rangeSegments) {
- bitmap.add(element);
- }
- assertFalse(RssUtils.checkIfBlockInRange(rangeSegments, 13L));
- assertTrue(RssUtils.checkIfBlockInRange(rangeSegments, 10L));
- assertFalse(RssUtils.checkIfBlockInRange(rangeSegments, 9L));
- assertTrue(RssUtils.checkIfBlockInRange(rangeSegments, 5L));
- assertFalse(RssUtils.checkIfBlockInRange(rangeSegments, 4L));
- assertFalse(RssUtils.checkIfBlockInRange(rangeSegments, 3L));
- assertTrue(RssUtils.checkIfBlockInRange(rangeSegments, 2L));
- assertTrue(RssUtils.checkIfBlockInRange(rangeSegments, 1L));
- assertFalse(RssUtils.checkIfBlockInRange(rangeSegments, 0L));
- }
-
// Copy from ClientUtils
private Long getBlockId(long partitionId, long taskAttemptId, long
atomicInt) {
return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH +
Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
index 551b9be6..accc280f 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
@@ -34,7 +34,6 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
-import org.apache.uniffle.common.BlockSkipStrategy;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
@@ -307,67 +306,6 @@ public class ShuffleServerWithMemoryTest extends
ShuffleReadWriteBase {
assertNull(sdr);
}
-
- @Test
- public void memoryReadTestWithSkipByBlockIdRange() throws Exception {
- String testAppId = "memoryReadTestWithSkipByBlockIdRange";
- int shuffleId = 0;
- int partitionId = 0;
- RssRegisterShuffleRequest rrsr = new RssRegisterShuffleRequest(testAppId,
0,
- Lists.newArrayList(new PartitionRange(0, 0)), "");
- shuffleServerClient.registerShuffle(rrsr);
- Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
- Map<Long, byte[]> dataMap = Maps.newHashMap();
- Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[1];
- bitmaps[0] = Roaring64NavigableMap.bitmapOf();
- List<ShuffleBlockInfo> blocks = createShuffleBlockList(
- shuffleId, partitionId, 0, 3, 25,
- expectBlockIds, dataMap, mockSSI);
- Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
- partitionToBlocks.put(partitionId, blocks);
- Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks =
Maps.newHashMap();
- shuffleToBlocks.put(shuffleId, partitionToBlocks);
-
- // send data to shuffle server
- RssSendShuffleDataRequest rssdr = new RssSendShuffleDataRequest(
- testAppId, 3, 1000, shuffleToBlocks);
- shuffleServerClient.sendShuffleData(rssdr);
-
- // data is cached
- assertEquals(3, shuffleServers.get(0).getShuffleBufferManager()
- .getShuffleBuffer(testAppId, shuffleId, 0).getBlocks().size());
-
- Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
-
- MemoryClientReadHandler memoryClientReadHandler = new
MemoryClientReadHandler(
- testAppId, shuffleId, partitionId, 20, shuffleServerClient, null,
- expectBlockIds, processBlockIds, BlockSkipStrategy.BLOCKID_RANGE, 3);
-
- ShuffleDataResult sdr = memoryClientReadHandler.readShuffleData();
- Map<Long, byte[]> expectedData = Maps.newHashMap();
- expectedData.put(blocks.get(0).getBlockId(), blocks.get(0).getData());
- validateResult(expectedData, sdr);
-
- // skip the first block
- processBlockIds.add(blocks.get(0).getBlockId());
- MemoryClientReadHandler memoryClientReadHandler2 = new
MemoryClientReadHandler(
- testAppId, shuffleId, partitionId, 20, shuffleServerClient, null,
- expectBlockIds, processBlockIds, BlockSkipStrategy.BLOCKID_RANGE, 3);
- sdr = memoryClientReadHandler2.readShuffleData();
- expectedData = Maps.newHashMap();
- expectedData.put(blocks.get(1).getBlockId(), blocks.get(1).getData());
- validateResult(expectedData, sdr);
-
- // skip all blocks
- blocks.forEach((block) -> processBlockIds.add(block.getBlockId()));
- MemoryClientReadHandler memoryClientReadHandler3 = new
MemoryClientReadHandler(
- testAppId, shuffleId, partitionId, 20, shuffleServerClient, null,
- expectBlockIds, processBlockIds, BlockSkipStrategy.BLOCKID_RANGE, 3);
- sdr = memoryClientReadHandler3.readShuffleData();
- assertNull(sdr);
- }
-
-
protected void validateResult(
Map<Long, byte[]> expectedData,
ShuffleDataResult sdr) {
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinWithBlockIdRangeSkipStrategyTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinWithBlockIdRangeSkipStrategyTest.java
deleted file mode 100644
index de8d16e8..00000000
---
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinWithBlockIdRangeSkipStrategyTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.test;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.shuffle.RssSparkConfig;
-
-import org.apache.uniffle.common.BlockSkipStrategy;
-
-public class AQESkewedJoinWithBlockIdRangeSkipStrategyTest extends
AQESkewedJoinTest {
-
- @Override
- public void updateSparkConfCustomer(SparkConf sparkConf) {
- super.updateSparkConfCustomer(sparkConf);
- sparkConf.set(RssSparkConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY.key(),
BlockSkipStrategy.BLOCKID_RANGE.name());
- }
-}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 7f1b4d41..74fe2767 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -619,7 +619,6 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
.setLastBlockId(request.getLastBlockId())
.setReadBufferSize(request.getReadBufferSize())
.setSerializedExpectedTaskIdsBitmap(serializedTaskIdsBytes)
- .addAllExpectedBlockIdRange(request.getExpectedBlockIdRange())
.setTimestamp(start)
.build();
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetInMemoryShuffleDataRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetInMemoryShuffleDataRequest.java
index b3ff0d82..87c3d2f1 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetInMemoryShuffleDataRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetInMemoryShuffleDataRequest.java
@@ -17,8 +17,6 @@
package org.apache.uniffle.client.request;
-import java.util.List;
-
import org.roaringbitmap.longlong.Roaring64NavigableMap;
public class RssGetInMemoryShuffleDataRequest {
@@ -29,18 +27,15 @@ public class RssGetInMemoryShuffleDataRequest {
private final int readBufferSize;
private final Roaring64NavigableMap expectedTaskIds;
- private List<Long> expectedBlockIdRange;
-
public RssGetInMemoryShuffleDataRequest(
String appId, int shuffleId, int partitionId, long lastBlockId, int
readBufferSize,
- Roaring64NavigableMap expectedTaskIds, List<Long> expectedBlockIdRange) {
+ Roaring64NavigableMap expectedTaskIds) {
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionId = partitionId;
this.lastBlockId = lastBlockId;
this.readBufferSize = readBufferSize;
this.expectedTaskIds = expectedTaskIds;
- this.expectedBlockIdRange = expectedBlockIdRange;
}
public String getAppId() {
@@ -66,8 +61,4 @@ public class RssGetInMemoryShuffleDataRequest {
public Roaring64NavigableMap getExpectedTaskIds() {
return expectedTaskIds;
}
-
- public List<Long> getExpectedBlockIdRange() {
- return expectedBlockIdRange;
- }
}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index e2c081a1..db3c6a07 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -93,7 +93,6 @@ message GetMemoryShuffleDataRequest {
int32 readBufferSize = 5;
int64 timestamp = 6;
optional bytes serializedExpectedTaskIdsBitmap = 7;
- repeated int64 expectedBlockIdRange = 8;
}
message GetMemoryShuffleDataResponse {
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index fd100b2e..3de560ce 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -662,7 +662,6 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
request.getSerializedExpectedTaskIdsBitmap().toByteArray()
);
}
- List<Long> expectedBlockIdRange =
request.getExpectedBlockIdRangeList();
ShuffleDataResult shuffleDataResult = shuffleServer
.getShuffleTaskManager()
.getInMemoryShuffleData(
@@ -671,8 +670,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
partitionId,
blockId,
readBufferSize,
- expectedTaskIds,
- expectedBlockIdRange
+ expectedTaskIds
);
byte[] data = new byte[]{};
List<BufferSegment> bufferSegments = Lists.newArrayList();
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index c197b525..21d855f8 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -364,9 +364,9 @@ public class ShuffleTaskManager {
public ShuffleDataResult getInMemoryShuffleData(
String appId, Integer shuffleId, Integer partitionId, long blockId, int
readBufferSize,
- Roaring64NavigableMap expectedTaskIds, List<Long> expectedBlockIdRange) {
+ Roaring64NavigableMap expectedTaskIds) {
return shuffleBufferManager.getShuffleData(appId,
- shuffleId, partitionId, blockId, readBufferSize, expectedTaskIds,
expectedBlockIdRange);
+ shuffleId, partitionId, blockId, readBufferSize, expectedTaskIds);
}
public ShuffleDataResult getShuffleData(
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index 187d6d65..cdd55629 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -25,7 +25,6 @@ import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.commons.collections.CollectionUtils;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +35,6 @@ import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.util.Constants;
-import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleFlushManager;
@@ -151,22 +149,16 @@ public class ShuffleBuffer {
return getShuffleData(lastBlockId, readBufferSize, null);
}
- public synchronized ShuffleDataResult getShuffleData(
- long lastBlockId, int readBufferSize, Roaring64NavigableMap
expectedTaskIds) {
- return getShuffleData(lastBlockId, readBufferSize, expectedTaskIds, null);
- }
-
// 1. generate buffer segments and other info: if blockId exist, start with
which eventId
// 2. according to info from step 1, generate data
// todo: if block was flushed, it's possible to get duplicated data
public synchronized ShuffleDataResult getShuffleData(
- long lastBlockId, int readBufferSize, Roaring64NavigableMap
expectedTaskIds,
- List<Long> expectedBlockIdRange) {
+ long lastBlockId, int readBufferSize, Roaring64NavigableMap
expectedTaskIds) {
try {
List<BufferSegment> bufferSegments = Lists.newArrayList();
List<ShufflePartitionedBlock> readBlocks = Lists.newArrayList();
updateBufferSegmentsAndResultBlocks(
- lastBlockId, readBufferSize, bufferSegments, readBlocks,
expectedTaskIds, expectedBlockIdRange);
+ lastBlockId, readBufferSize, bufferSegments, readBlocks,
expectedTaskIds);
if (!bufferSegments.isEmpty()) {
int length = calculateDataLength(bufferSegments);
byte[] data = new byte[length];
@@ -188,8 +180,7 @@ public class ShuffleBuffer {
long readBufferSize,
List<BufferSegment> bufferSegments,
List<ShufflePartitionedBlock> resultBlocks,
- Roaring64NavigableMap expectedTaskIds,
- List<Long> expectedBlockIdRange) {
+ Roaring64NavigableMap expectedTaskIds) {
long nextBlockId = lastBlockId;
List<Long> sortedEventId = sortFlushingEventId();
int offset = 0;
@@ -203,11 +194,11 @@ public class ShuffleBuffer {
// update bufferSegments with different strategy according to
lastBlockId
if (nextBlockId == Constants.INVALID_BLOCK_ID) {
updateSegmentsWithoutBlockId(offset, inFlushBlockMap.get(eventId),
readBufferSize,
- bufferSegments, resultBlocks, expectedTaskIds,
expectedBlockIdRange);
+ bufferSegments, resultBlocks, expectedTaskIds);
hasLastBlockId = true;
} else {
hasLastBlockId = updateSegmentsWithBlockId(offset,
inFlushBlockMap.get(eventId),
- readBufferSize, nextBlockId, bufferSegments, resultBlocks,
expectedTaskIds, expectedBlockIdRange);
+ readBufferSize, nextBlockId, bufferSegments, resultBlocks,
expectedTaskIds);
// if last blockId is found, read from begin with next cached blocks
if (hasLastBlockId) {
// reset blockId to read from begin in next cached blocks
@@ -225,12 +216,11 @@ public class ShuffleBuffer {
// try to read from cached blocks which is not in flush queue
if (blocks.size() > 0 && offset < readBufferSize) {
if (nextBlockId == Constants.INVALID_BLOCK_ID) {
- updateSegmentsWithoutBlockId(offset, blocks, readBufferSize,
bufferSegments,
- resultBlocks, expectedTaskIds, expectedBlockIdRange);
+ updateSegmentsWithoutBlockId(offset, blocks, readBufferSize,
bufferSegments, resultBlocks, expectedTaskIds);
hasLastBlockId = true;
} else {
hasLastBlockId = updateSegmentsWithBlockId(offset, blocks,
- readBufferSize, nextBlockId, bufferSegments, resultBlocks,
expectedTaskIds, expectedBlockIdRange);
+ readBufferSize, nextBlockId, bufferSegments, resultBlocks,
expectedTaskIds);
}
}
if ((!inFlushBlockMap.isEmpty() || blocks.size() > 0) && offset == 0 &&
!hasLastBlockId) {
@@ -238,8 +228,7 @@ public class ShuffleBuffer {
// but there still has data in memory
// try read again with blockId = Constants.INVALID_BLOCK_ID
updateBufferSegmentsAndResultBlocks(
- Constants.INVALID_BLOCK_ID, readBufferSize, bufferSegments,
- resultBlocks, expectedTaskIds, expectedBlockIdRange);
+ Constants.INVALID_BLOCK_ID, readBufferSize, bufferSegments,
resultBlocks, expectedTaskIds);
}
}
@@ -281,19 +270,13 @@ public class ShuffleBuffer {
long readBufferSize,
List<BufferSegment> bufferSegments,
List<ShufflePartitionedBlock> readBlocks,
- Roaring64NavigableMap expectedTaskIds,
- List<Long> expectedBlockIdRange) {
+ Roaring64NavigableMap expectedTaskIds) {
int currentOffset = offset;
// read from first block
for (ShufflePartitionedBlock block : cachedBlocks) {
if (expectedTaskIds != null &&
!expectedTaskIds.contains(block.getTaskAttemptId())) {
continue;
}
-
- if (CollectionUtils.isNotEmpty(expectedBlockIdRange)
- && !RssUtils.checkIfBlockInRange(expectedBlockIdRange,
block.getBlockId())) {
- continue;
- }
// add bufferSegment with block
bufferSegments.add(new BufferSegment(block.getBlockId(), currentOffset,
block.getLength(),
block.getUncompressLength(), block.getCrc(),
block.getTaskAttemptId()));
@@ -314,8 +297,7 @@ public class ShuffleBuffer {
long lastBlockId,
List<BufferSegment> bufferSegments,
List<ShufflePartitionedBlock> readBlocks,
- Roaring64NavigableMap expectedTaskIds,
- List<Long> expectedBlockIdRange) {
+ Roaring64NavigableMap expectedTaskIds) {
int currentOffset = offset;
// find lastBlockId, then read from next block
boolean foundBlockId = false;
@@ -330,10 +312,6 @@ public class ShuffleBuffer {
if (expectedTaskIds != null &&
!expectedTaskIds.contains(block.getTaskAttemptId())) {
continue;
}
- if (CollectionUtils.isNotEmpty(expectedBlockIdRange)
- && !RssUtils.checkIfBlockInRange(expectedBlockIdRange,
block.getBlockId())) {
- continue;
- }
// add bufferSegment with block
bufferSegments.add(new BufferSegment(block.getBlockId(), currentOffset,
block.getLength(),
block.getUncompressLength(), block.getCrc(),
block.getTaskAttemptId()));
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index a5aa9973..366d10eb 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -158,15 +158,13 @@ public class ShuffleBufferManager {
partitionId,
blockId,
readBufferSize,
- null,
null
);
}
public ShuffleDataResult getShuffleData(
String appId, int shuffleId, int partitionId, long blockId,
- int readBufferSize, Roaring64NavigableMap expectedTaskIds,
- List<Long> expectedBlockIdRange) {
+ int readBufferSize, Roaring64NavigableMap expectedTaskIds) {
Map.Entry<Range<Integer>, ShuffleBuffer> entry = getShuffleBufferEntry(
appId, shuffleId, partitionId);
if (entry == null) {
@@ -177,7 +175,7 @@ public class ShuffleBufferManager {
if (buffer == null) {
return null;
}
- return buffer.getShuffleData(blockId, readBufferSize, expectedTaskIds,
expectedBlockIdRange);
+ return buffer.getShuffleData(blockId, readBufferSize, expectedTaskIds);
}
void flushSingleBufferIfNecessary(ShuffleBuffer buffer, String appId,
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 60f39c33..1e6d056d 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -22,7 +22,6 @@ import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
-import com.google.common.collect.Lists;
import com.google.common.collect.RangeMap;
import com.google.common.io.Files;
import org.junit.jupiter.api.BeforeEach;
@@ -30,7 +29,6 @@ import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.server.ShuffleFlushManager;
@@ -124,8 +122,7 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
0,
Constants.INVALID_BLOCK_ID,
60,
- Roaring64NavigableMap.bitmapOf(1),
- null
+ Roaring64NavigableMap.bitmapOf(1)
);
assertEquals(1, result.getBufferSegments().size());
assertEquals(0, result.getBufferSegments().get(0).getOffset());
@@ -139,8 +136,7 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
0,
lastBlockId,
60,
- Roaring64NavigableMap.bitmapOf(1),
- null
+ Roaring64NavigableMap.bitmapOf(1)
);
assertEquals(1, result.getBufferSegments().size());
assertEquals(0, result.getBufferSegments().get(0).getOffset());
@@ -168,22 +164,13 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
assertEquals(100, bufferPool.get(appId).get(1).get(0).getSize());
assertEquals(200, bufferPool.get(appId).get(2).get(0).getSize());
assertEquals(100, bufferPool.get(appId).get(3).get(0).getSize());
- Roaring64NavigableMap processedBlockIds = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap exceptTaskIds = Roaring64NavigableMap.bitmapOf();
- Lists.newArrayList(spd1, spd2, spd3, spd4).forEach((spd) -> {
- for (ShufflePartitionedBlock shufflePartitionedBlock :
spd.getBlockList()) {
- exceptTaskIds.add(shufflePartitionedBlock.getTaskAttemptId());
- }
- });
-
// validate get shuffle data
ShuffleDataResult sdr = shuffleBufferManager.getShuffleData(
- appId, 2, 0, Constants.INVALID_BLOCK_ID, 60, exceptTaskIds, null);
+ appId, 2, 0, Constants.INVALID_BLOCK_ID, 60);
assertArrayEquals(spd2.getBlockList()[0].getData(), sdr.getData());
long lastBlockId = spd2.getBlockList()[0].getBlockId();
- processedBlockIds = Roaring64NavigableMap.bitmapOf();
sdr = shuffleBufferManager.getShuffleData(
- appId, 2, 0, lastBlockId, 100, exceptTaskIds, null);
+ appId, 2, 0, lastBlockId, 100);
assertArrayEquals(spd3.getBlockList()[0].getData(), sdr.getData());
// flush happen
ShufflePartitionedData spd5 = createData(0, 10);
@@ -198,14 +185,12 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
// keep buffer whose size < low water mark
assertEquals(1, bufferPool.get(appId).get(4).get(0).getBlocks().size());
// data in flush buffer now, it also can be got before flush finish
- processedBlockIds = Roaring64NavigableMap.bitmapOf();
sdr = shuffleBufferManager.getShuffleData(
- appId, 2, 0, Constants.INVALID_BLOCK_ID, 60, exceptTaskIds, null);
+ appId, 2, 0, Constants.INVALID_BLOCK_ID, 60);
assertArrayEquals(spd2.getBlockList()[0].getData(), sdr.getData());
lastBlockId = spd2.getBlockList()[0].getBlockId();
- processedBlockIds = Roaring64NavigableMap.bitmapOf();
sdr = shuffleBufferManager.getShuffleData(
- appId, 2, 0, lastBlockId, 100, exceptTaskIds, null);
+ appId, 2, 0, lastBlockId, 100);
assertArrayEquals(spd3.getBlockList()[0].getData(), sdr.getData());
// cache data again, it should cause flush
spd1 = createData(0, 10);
@@ -216,14 +201,12 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
bufferPool.get(appId).get(2).get(0).getInFlushBlockMap().clear();
bufferPool.get(appId).get(3).get(0).getInFlushBlockMap().clear();
// empty data return
- processedBlockIds = Roaring64NavigableMap.bitmapOf();
sdr = shuffleBufferManager.getShuffleData(
- appId, 2, 0, Constants.INVALID_BLOCK_ID, 60, exceptTaskIds, null);
+ appId, 2, 0, Constants.INVALID_BLOCK_ID, 60);
assertEquals(0, sdr.getData().length);
lastBlockId = spd2.getBlockList()[0].getBlockId();
- processedBlockIds = Roaring64NavigableMap.bitmapOf();
sdr = shuffleBufferManager.getShuffleData(
- appId, 2, 0, lastBlockId, 100, exceptTaskIds, null);
+ appId, 2, 0, lastBlockId, 100);
assertEquals(0, sdr.getData().length);
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java
index a8513cff..a45f1a55 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java
@@ -83,17 +83,6 @@ public class ShuffleBufferTest extends BufferTestBase {
assertEquals(0, shuffleBuffer.getBlocks().size());
}
- private ShuffleDataResult getShuffleData(ShuffleBuffer shuffleBuffer, long
lastBlockId,
- int readBufferSize,
ShufflePartitionedData... spds) {
- Roaring64NavigableMap exceptTaskIds = Roaring64NavigableMap.bitmapOf();
- for (ShufflePartitionedData spd : spds) {
- for (ShufflePartitionedBlock shufflePartitionedBlock :
spd.getBlockList()) {
- exceptTaskIds.add(shufflePartitionedBlock.getTaskAttemptId());
- }
- }
- return shuffleBuffer.getShuffleData(lastBlockId, readBufferSize,
exceptTaskIds);
- }
-
@Test
public void getShuffleDataWithExpectedTaskIdsFilterTest() {
/**
@@ -266,7 +255,7 @@ public class ShuffleBufferTest extends BufferTestBase {
shuffleBuffer.append(spd1);
shuffleBuffer.append(spd2);
byte[] expectedData = getExpectedData(spd1, spd2);
- ShuffleDataResult sdr = getShuffleData(shuffleBuffer,
Constants.INVALID_BLOCK_ID, 40, spd1, spd2);
+ ShuffleDataResult sdr =
shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 40);
compareBufferSegment(shuffleBuffer.getBlocks(), sdr.getBufferSegments(),
0, 2);
assertArrayEquals(expectedData, sdr.getData());
@@ -277,7 +266,7 @@ public class ShuffleBufferTest extends BufferTestBase {
shuffleBuffer.append(spd1);
shuffleBuffer.append(spd2);
expectedData = getExpectedData(spd1, spd2);
- sdr = getShuffleData(shuffleBuffer, Constants.INVALID_BLOCK_ID, 40, spd1,
spd2);
+ sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 40);
compareBufferSegment(shuffleBuffer.getBlocks(), sdr.getBufferSegments(),
0, 2);
assertArrayEquals(expectedData, sdr.getData());
@@ -288,7 +277,7 @@ public class ShuffleBufferTest extends BufferTestBase {
shuffleBuffer.append(spd1);
shuffleBuffer.append(spd2);
expectedData = getExpectedData(spd1, spd2);
- sdr = getShuffleData(shuffleBuffer, Constants.INVALID_BLOCK_ID, 40, spd1,
spd2);
+ sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 40);
compareBufferSegment(shuffleBuffer.getBlocks(), sdr.getBufferSegments(),
0, 2);
assertArrayEquals(expectedData, sdr.getData());
@@ -301,13 +290,13 @@ public class ShuffleBufferTest extends BufferTestBase {
shuffleBuffer.append(spd2);
shuffleBuffer.append(spd3);
expectedData = getExpectedData(spd1, spd2);
- sdr = getShuffleData(shuffleBuffer, Constants.INVALID_BLOCK_ID, 25, spd1,
spd2);
+ sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 25);
compareBufferSegment(shuffleBuffer.getBlocks(), sdr.getBufferSegments(),
0, 2);
assertArrayEquals(expectedData, sdr.getData());
// case4: cached data only, blockId != -1 && exist, readBufferSize <
buffer size
long lastBlockId = spd2.getBlockList()[0].getBlockId();
- sdr = getShuffleData(shuffleBuffer, lastBlockId, 25, spd3);
+ sdr = shuffleBuffer.getShuffleData(lastBlockId, 25);
expectedData = getExpectedData(spd3);
compareBufferSegment(shuffleBuffer.getBlocks(), sdr.getBufferSegments(),
2, 1);
assertArrayEquals(expectedData, sdr.getData());
@@ -320,19 +309,19 @@ public class ShuffleBufferTest extends BufferTestBase {
shuffleBuffer.append(spd2);
ShuffleDataFlushEvent event1 = shuffleBuffer.toFlushEvent("appId", 0, 0,
1, null);
assertEquals(0, shuffleBuffer.getBlocks().size());
- sdr = getShuffleData(shuffleBuffer, Constants.INVALID_BLOCK_ID, 20, spd1,
spd2);
+ sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 20);
compareBufferSegment(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()),
sdr.getBufferSegments(), 0, 2);
expectedData = getExpectedData(spd1, spd2);
assertArrayEquals(expectedData, sdr.getData());
// case5: flush data only, blockId = lastBlockId
- sdr = getShuffleData(shuffleBuffer, spd2.getBlockList()[0].getBlockId(),
20, spd1, spd2);
+ sdr = shuffleBuffer.getShuffleData(spd2.getBlockList()[0].getBlockId(),
20);
assertEquals(0, sdr.getBufferSegments().size());
// case6: no data in buffer & flush buffer
shuffleBuffer = new ShuffleBuffer(200);
- sdr = getShuffleData(shuffleBuffer, Constants.INVALID_BLOCK_ID, 10, spd1,
spd2);
+ sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 10);
assertEquals(0, sdr.getBufferSegments().size());
assertEquals(0, sdr.getData().length);
@@ -385,7 +374,7 @@ public class ShuffleBufferTest extends BufferTestBase {
List<ShufflePartitionedBlock> expectedBlocks = Lists.newArrayList(
shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
expectedData = getExpectedData(spd1);
- sdr = getShuffleData(shuffleBuffer, Constants.INVALID_BLOCK_ID, 10, spd1);
+ sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 10);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 0, 1);
assertArrayEquals(expectedData, sdr.getData());
@@ -394,7 +383,7 @@ public class ShuffleBufferTest extends BufferTestBase {
expectedBlocks = Lists.newArrayList(
shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
expectedData = getExpectedData(spd2);
- sdr = getShuffleData(shuffleBuffer, spd1.getBlockList()[0].getBlockId(),
10, spd2);
+ sdr = shuffleBuffer.getShuffleData(spd1.getBlockList()[0].getBlockId(),
10);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 1, 1);
assertArrayEquals(expectedData, sdr.getData());
@@ -403,7 +392,7 @@ public class ShuffleBufferTest extends BufferTestBase {
expectedBlocks = Lists.newArrayList(
shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
expectedData = getExpectedData(spd1, spd2);
- sdr = getShuffleData(shuffleBuffer, Constants.INVALID_BLOCK_ID, 20, spd1,
spd2);
+ sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 20);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 0, 2);
assertArrayEquals(expectedData, sdr.getData());
@@ -412,7 +401,7 @@ public class ShuffleBufferTest extends BufferTestBase {
expectedBlocks = Lists.newArrayList(
shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
expectedData = getExpectedData(spd2, spd3);
- sdr = getShuffleData(shuffleBuffer, spd1.getBlockList()[0].getBlockId(),
20, spd2, spd3);
+ sdr = shuffleBuffer.getShuffleData(spd1.getBlockList()[0].getBlockId(),
20);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 1, 2);
assertArrayEquals(expectedData, sdr.getData());
@@ -422,7 +411,7 @@ public class ShuffleBufferTest extends BufferTestBase {
shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
expectedData = getExpectedData(spd1, spd2, spd3, spd4);
- sdr = getShuffleData(shuffleBuffer, Constants.INVALID_BLOCK_ID, 50, spd1,
spd2, spd3, spd4);
+ sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 50);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 0, 4);
assertArrayEquals(expectedData, sdr.getData());
@@ -432,7 +421,7 @@ public class ShuffleBufferTest extends BufferTestBase {
shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
expectedData = getExpectedData(spd2, spd3);
- sdr = getShuffleData(shuffleBuffer, spd1.getBlockList()[0].getBlockId(),
20, spd2, spd3);
+ sdr = shuffleBuffer.getShuffleData(spd1.getBlockList()[0].getBlockId(),
20);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 1, 2);
assertArrayEquals(expectedData, sdr.getData());
@@ -441,7 +430,7 @@ public class ShuffleBufferTest extends BufferTestBase {
expectedBlocks = Lists.newArrayList(
shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
expectedData = getExpectedData(spd4);
- sdr = getShuffleData(shuffleBuffer, spd3.getBlockList()[0].getBlockId(),
10, spd4);
+ sdr = shuffleBuffer.getShuffleData(spd3.getBlockList()[0].getBlockId(),
10);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 0, 1);
assertArrayEquals(expectedData, sdr.getData());
@@ -450,7 +439,7 @@ public class ShuffleBufferTest extends BufferTestBase {
expectedBlocks = Lists.newArrayList(
shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
expectedData = getExpectedData(spd6);
- sdr = getShuffleData(shuffleBuffer, spd5.getBlockList()[0].getBlockId(),
10, spd6);
+ sdr = shuffleBuffer.getShuffleData(spd5.getBlockList()[0].getBlockId(),
10);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 2, 1);
assertArrayEquals(expectedData, sdr.getData());
@@ -459,7 +448,7 @@ public class ShuffleBufferTest extends BufferTestBase {
expectedBlocks = Lists.newArrayList(
shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
expectedData = getExpectedData(spd4, spd5, spd6);
- sdr = getShuffleData(shuffleBuffer, spd3.getBlockList()[0].getBlockId(),
40, spd4, spd5, spd6);
+ sdr = shuffleBuffer.getShuffleData(spd3.getBlockList()[0].getBlockId(),
40);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 0, 3);
assertArrayEquals(expectedData, sdr.getData());
@@ -470,7 +459,7 @@ public class ShuffleBufferTest extends BufferTestBase {
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event3.getEventId()));
expectedData = getExpectedData(spd3, spd4, spd5, spd6, spd7);
- sdr = getShuffleData(shuffleBuffer, spd2.getBlockList()[0].getBlockId(),
70, spd3, spd4, spd5, spd6, spd7);
+ sdr = shuffleBuffer.getShuffleData(spd2.getBlockList()[0].getBlockId(),
70);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 2, 5);
assertArrayEquals(expectedData, sdr.getData());
@@ -480,7 +469,7 @@ public class ShuffleBufferTest extends BufferTestBase {
shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event3.getEventId()));
expectedData = getExpectedData(spd6, spd7);
- sdr = getShuffleData(shuffleBuffer, spd5.getBlockList()[0].getBlockId(),
20, spd6, spd7);
+ sdr = shuffleBuffer.getShuffleData(spd5.getBlockList()[0].getBlockId(),
20);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 2, 2);
assertArrayEquals(expectedData, sdr.getData());
@@ -490,7 +479,7 @@ public class ShuffleBufferTest extends BufferTestBase {
shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event3.getEventId()));
expectedData = getExpectedData(spd6, spd7, spd8, spd9);
- sdr = getShuffleData(shuffleBuffer, spd5.getBlockList()[0].getBlockId(),
50, spd6, spd7, spd8, spd9);
+ sdr = shuffleBuffer.getShuffleData(spd5.getBlockList()[0].getBlockId(),
50);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 2, 4);
assertArrayEquals(expectedData, sdr.getData());
@@ -500,7 +489,7 @@ public class ShuffleBufferTest extends BufferTestBase {
shuffleBuffer.getInFlushBlockMap().get(event3.getEventId()));
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
expectedData = getExpectedData(spd9, spd10);
- sdr = getShuffleData(shuffleBuffer, spd8.getBlockList()[0].getBlockId(),
20, spd9, spd10);
+ sdr = shuffleBuffer.getShuffleData(spd8.getBlockList()[0].getBlockId(),
20);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 2, 2);
assertArrayEquals(expectedData, sdr.getData());
@@ -509,7 +498,7 @@ public class ShuffleBufferTest extends BufferTestBase {
expectedBlocks = Lists.newArrayList(
shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
expectedData = getExpectedData(spd10);
- sdr = getShuffleData(shuffleBuffer, spd9.getBlockList()[0].getBlockId(),
10, spd10);
+ sdr = shuffleBuffer.getShuffleData(spd9.getBlockList()[0].getBlockId(),
10);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 0, 1);
assertArrayEquals(expectedData, sdr.getData());
@@ -518,7 +507,7 @@ public class ShuffleBufferTest extends BufferTestBase {
expectedBlocks = Lists.newArrayList(
shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
expectedData = getExpectedData(spd12);
- sdr = getShuffleData(shuffleBuffer, spd11.getBlockList()[0].getBlockId(),
10, spd12);
+ sdr = shuffleBuffer.getShuffleData(spd11.getBlockList()[0].getBlockId(),
10);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 2, 1);
assertArrayEquals(expectedData, sdr.getData());
@@ -527,7 +516,7 @@ public class ShuffleBufferTest extends BufferTestBase {
expectedBlocks = Lists.newArrayList(
shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
expectedData = getExpectedData(spd10, spd11, spd12);
- sdr = getShuffleData(shuffleBuffer, spd9.getBlockList()[0].getBlockId(),
40, spd10, spd11, spd12);
+ sdr = shuffleBuffer.getShuffleData(spd9.getBlockList()[0].getBlockId(),
40);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 0, 3);
assertArrayEquals(expectedData, sdr.getData());
@@ -537,7 +526,7 @@ public class ShuffleBufferTest extends BufferTestBase {
shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
expectedBlocks.addAll(shuffleBuffer.getBlocks());
expectedData = getExpectedData(spd12, spd13);
- sdr = getShuffleData(shuffleBuffer, spd11.getBlockList()[0].getBlockId(),
20, spd12, spd13);
+ sdr = shuffleBuffer.getShuffleData(spd11.getBlockList()[0].getBlockId(),
20);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 2, 2);
assertArrayEquals(expectedData, sdr.getData());
@@ -546,7 +535,7 @@ public class ShuffleBufferTest extends BufferTestBase {
expectedBlocks = Lists.newArrayList(shuffleBuffer.getBlocks());
expectedBlocks.addAll(shuffleBuffer.getBlocks());
expectedData = getExpectedData(spd13);
- sdr = getShuffleData(shuffleBuffer, spd12.getBlockList()[0].getBlockId(),
10, spd13);
+ sdr = shuffleBuffer.getShuffleData(spd12.getBlockList()[0].getBlockId(),
10);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 0, 1);
assertArrayEquals(expectedData, sdr.getData());
@@ -555,7 +544,7 @@ public class ShuffleBufferTest extends BufferTestBase {
expectedBlocks = Lists.newArrayList(shuffleBuffer.getBlocks());
expectedBlocks.addAll(shuffleBuffer.getBlocks());
expectedData = getExpectedData(spd14, spd15);
- sdr = getShuffleData(shuffleBuffer, spd13.getBlockList()[0].getBlockId(),
20, spd14, spd15);
+ sdr = shuffleBuffer.getShuffleData(spd13.getBlockList()[0].getBlockId(),
20);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 1, 2);
assertArrayEquals(expectedData, sdr.getData());
@@ -564,7 +553,7 @@ public class ShuffleBufferTest extends BufferTestBase {
expectedBlocks = Lists.newArrayList(shuffleBuffer.getBlocks());
expectedBlocks.addAll(shuffleBuffer.getBlocks());
expectedData = getExpectedData(spd15);
- sdr = getShuffleData(shuffleBuffer, spd14.getBlockList()[0].getBlockId(),
10, spd15);
+ sdr = shuffleBuffer.getShuffleData(spd14.getBlockList()[0].getBlockId(),
10);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 2, 1);
assertArrayEquals(expectedData, sdr.getData());
@@ -574,7 +563,7 @@ public class ShuffleBufferTest extends BufferTestBase {
shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
expectedBlocks.addAll(shuffleBuffer.getBlocks());
expectedData = getExpectedData(spd12, spd13, spd14, spd15);
- sdr = getShuffleData(shuffleBuffer, spd11.getBlockList()[0].getBlockId(),
50, spd12, spd13, spd14, spd15);
+ sdr = shuffleBuffer.getShuffleData(spd11.getBlockList()[0].getBlockId(),
50);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 2, 4);
assertArrayEquals(expectedData, sdr.getData());
@@ -588,24 +577,20 @@ public class ShuffleBufferTest extends BufferTestBase {
expectedBlocks.addAll(shuffleBuffer.getBlocks());
expectedData = getExpectedData(spd1, spd2, spd3, spd4, spd5, spd6, spd7,
spd8, spd9,
spd10, spd11, spd12, spd13, spd14, spd15);
- sdr = getShuffleData(shuffleBuffer, Constants.INVALID_BLOCK_ID, 220,
- spd1, spd2, spd3, spd4, spd5, spd6, spd7, spd8, spd9,
- spd10, spd11, spd12, spd13, spd14, spd15);
+ sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 220);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 0, 15);
assertArrayEquals(expectedData, sdr.getData());
// case7 after get spd15
- sdr = getShuffleData(shuffleBuffer, spd15.getBlockList()[0].getBlockId(),
20,
- spd1, spd2, spd3, spd4, spd5, spd6, spd7, spd8, spd9,
- spd10, spd11, spd12, spd13, spd14, spd15);
+ sdr = shuffleBuffer.getShuffleData(spd15.getBlockList()[0].getBlockId(),
20);
assertEquals(0, sdr.getBufferSegments().size());
// case7 can't find blockId, read from start
expectedBlocks = Lists.newArrayList(
shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
expectedData = getExpectedData(spd1, spd2);
- sdr = getShuffleData(shuffleBuffer, -200, 20, spd1, spd2);
+ sdr = shuffleBuffer.getShuffleData(-200, 20);
compareBufferSegment(expectedBlocks,
sdr.getBufferSegments(), 0, 2);
assertArrayEquals(expectedData, sdr.getData());
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
index fddd1650..e8fc4046 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
@@ -123,10 +123,7 @@ public class ShuffleHandlerFactory {
request.getReadBufferSize(),
shuffleServerClient,
request.getExpectTaskIds(),
- request.getExpectBlockIds(),
- request.getProcessBlockIds(),
- request.getBlockSkipStrategy(),
- request.getMaxBlockIdRangeSegments()
+ request.isExpectedTaskIdsBitmapFilterEnable()
);
return memoryClientReadHandler;
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
index 8c052023..3cc2e6ba 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
@@ -20,7 +20,6 @@ package org.apache.uniffle.storage.handler.impl;
import java.util.List;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,25 +27,18 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleServerClient;
import org.apache.uniffle.client.request.RssGetInMemoryShuffleDataRequest;
import org.apache.uniffle.client.response.RssGetInMemoryShuffleDataResponse;
-import org.apache.uniffle.common.BlockSkipStrategy;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.Constants;
-import org.apache.uniffle.common.util.RssUtils;
public class MemoryClientReadHandler extends AbstractClientReadHandler {
private static final Logger LOG =
LoggerFactory.getLogger(MemoryClientReadHandler.class);
- private final Roaring64NavigableMap expectBlockIds;
- private final Roaring64NavigableMap processBlockIds;
- private final BlockSkipStrategy blockSkipStrategy;
- private final int maxBlockIdRangeSegments;
private long lastBlockId = Constants.INVALID_BLOCK_ID;
private ShuffleServerClient shuffleServerClient;
private Roaring64NavigableMap expectTaskIds;
private boolean expectedTaskIdsBitmapFilterEnable;
- private List<Long> expectedBlockIdRange = Lists.newArrayList();
// Only for tests
@VisibleForTesting
@@ -63,10 +55,8 @@ public class MemoryClientReadHandler extends
AbstractClientReadHandler {
readBufferSize,
shuffleServerClient,
null,
- null,
- null,
- BlockSkipStrategy.TASK_BITMAP,
- 0);
+ false
+ );
}
public MemoryClientReadHandler(
@@ -76,32 +66,18 @@ public class MemoryClientReadHandler extends
AbstractClientReadHandler {
int readBufferSize,
ShuffleServerClient shuffleServerClient,
Roaring64NavigableMap expectTaskIds,
- Roaring64NavigableMap expectBlockIds,
- Roaring64NavigableMap processBlockIds,
- BlockSkipStrategy blockSkipStrategy,
- int maxBlockIdRangeSegments) {
+ boolean expectedTaskIdsBitmapFilterEnable) {
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionId = partitionId;
this.readBufferSize = readBufferSize;
this.shuffleServerClient = shuffleServerClient;
this.expectTaskIds = expectTaskIds;
- this.expectBlockIds = expectBlockIds;
- this.processBlockIds = processBlockIds;
- this.blockSkipStrategy = blockSkipStrategy;
- this.maxBlockIdRangeSegments = maxBlockIdRangeSegments;
+ this.expectedTaskIdsBitmapFilterEnable = expectedTaskIdsBitmapFilterEnable;
}
@Override
public ShuffleDataResult readShuffleData() {
- if (BlockSkipStrategy.BLOCKID_RANGE.equals(blockSkipStrategy) &&
lastBlockId == Constants.INVALID_BLOCK_ID) {
- Roaring64NavigableMap bitmap = RssUtils.cloneBitMap(expectBlockIds);
- bitmap.xor(processBlockIds);
- expectedBlockIdRange = RssUtils.generateRangeSegments(bitmap,
maxBlockIdRangeSegments);
- if (expectedBlockIdRange.size() == 0) {
- return null;
- }
- }
ShuffleDataResult result = null;
RssGetInMemoryShuffleDataRequest request = new
RssGetInMemoryShuffleDataRequest(
@@ -110,8 +86,7 @@ public class MemoryClientReadHandler extends
AbstractClientReadHandler {
partitionId,
lastBlockId,
readBufferSize,
- expectedTaskIdsBitmapFilterEnable ? expectTaskIds : null,
- expectedBlockIdRange
+ expectedTaskIdsBitmapFilterEnable ? expectTaskIds : null
);
try {
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
index 07b2143f..58729485 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
@@ -22,7 +22,6 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
-import org.apache.uniffle.common.BlockSkipStrategy;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssBaseConf;
@@ -45,8 +44,7 @@ public class CreateShuffleReadHandlerRequest {
private Roaring64NavigableMap processBlockIds;
private ShuffleDataDistributionType distributionType;
private Roaring64NavigableMap expectTaskIds;
- private BlockSkipStrategy blockSkipStrategy;
- private int maxBlockIdRangeSegments;
+ private boolean expectedTaskIdsBitmapFilterEnable;
public CreateShuffleReadHandlerRequest() {
}
@@ -179,19 +177,11 @@ public class CreateShuffleReadHandlerRequest {
this.expectTaskIds = expectTaskIds;
}
- public BlockSkipStrategy getBlockSkipStrategy() {
- return blockSkipStrategy;
+ public boolean isExpectedTaskIdsBitmapFilterEnable() {
+ return expectedTaskIdsBitmapFilterEnable;
}
- public void setBlockSkipStrategy(BlockSkipStrategy blockSkipStrategy) {
- this.blockSkipStrategy = blockSkipStrategy;
- }
-
- public int getMaxBlockIdRangeSegments() {
- return maxBlockIdRangeSegments;
- }
-
- public void setMaxBlockIdRangeSegments(int maxBlockIdRangeSegments) {
- this.maxBlockIdRangeSegments = maxBlockIdRangeSegments;
+ public void useExpectedTaskIdsBitmapFilter() {
+ this.expectedTaskIdsBitmapFilterEnable = true;
}
}