This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 1fbdfe57 feat: introduce storage manager selector to support more
selector strategy (#621)
1fbdfe57 is described below
commit 1fbdfe576f0a164f173cbff9ad83dccccb909ee2
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Mar 1 19:16:54 2023 +0800
feat: introduce storage manager selector to support more selector strategy
(#621)
### What changes were proposed in this pull request?
1. Introduce storage manager selector to support more selector strategy for
`MultiStorageManager`
2. Introduce the conf of `rss.server.multistorage.manager.selector.class`
to support different flush strategy, like I hope huge partition directly
flushed to HDFS and normal partition could be flushed to DISK when single
buffer flush is enabled.
### Why are the changes needed?
Solving the problem mentioned in
https://github.com/apache/incubator-uniffle/issues/378#issuecomment-1373447729.
In current codebase, when encountering huge partition, if single buffer
flush is enabled, the normal partition data will be flush to HDFS(I don't hope
so, because the local disk is free and the flushing speed is faster than HDFS).
But if disable single flush buffer, the huge partition event before marking as
huge partition may be big, which cause the slow flushing and then cause
requiring allocated buffer failed.
Based on above problems, this PR is to make single event carrying with 100
mb flushed into HDFS or local file leveraging the conf of
`rss.server.multistorage.manager.selector.class`
### Does this PR introduce _any_ user-facing change?
Yes. Doc will be updated later.
### How was this patch tested?
1. UTs
---
docs/server_guide.md | 1 +
.../uniffle/server/ShuffleDataFlushEvent.java | 10 ++++
.../apache/uniffle/server/ShuffleServerConf.java | 7 +++
.../apache/uniffle/server/ShuffleTaskManager.java | 7 ++-
.../server/buffer/ShuffleBufferManager.java | 70 +++++++++++-----------
.../server/storage/MultiStorageManager.java | 51 +++++++++++-----
.../multi/DefaultStorageManagerSelector.java | 47 +++++++++++++++
.../multi/FallbackBasedStorageManagerSelector.java | 57 ++++++++++++++++++
...gePartitionSensitiveStorageManagerSelector.java | 43 +++++++++++++
.../storage/multi/StorageManagerSelector.java | 31 ++++++++++
.../server/storage/MultiStorageManagerTest.java | 38 ++++++++++++
11 files changed, 310 insertions(+), 52 deletions(-)
diff --git a/docs/server_guide.md b/docs/server_guide.md
index fe20f342..bfae4405 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -89,6 +89,7 @@ This document will introduce how to deploy Uniffle shuffle
servers.
| rss.server.leak.shuffledata.check.interval | 3600000 | The
interval of leak shuffle data check (ms)
|
| rss.server.max.concurrency.of.single.partition.writer | 1 | The max
concurrency of single partition writer, the data partition file number is equal
to this value. Default value is 1. This config could improve the writing speed,
especially for huge partition.
|
| rss.metrics.reporter.class | - | The class
of metrics reporter.
|
+|rss.server.multistorage.manager.selector.class |
org.apache.uniffle.server.storage.multi.DefaultStorageManagerSelector | The
manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is
`DefaultStorageManagerSelector`, and another
`HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's
data to cold storage.
|
### Advanced Configurations
|Property Name|Default| Description
|
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
index 8dab3ac6..56abf7a2 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
@@ -47,6 +47,8 @@ public class ShuffleDataFlushEvent {
private Storage underStorage;
private final List<Runnable> cleanupCallbackChains;
+ private boolean ownedByHugePartition = false;
+
public ShuffleDataFlushEvent(
long eventId,
String appId,
@@ -163,4 +165,12 @@ public class ShuffleDataFlushEvent {
+ ", underStorage=" + (underStorage == null ? null :
underStorage.getClass().getSimpleName())
+ ", isPended=" + isPended;
}
+
+ public boolean isOwnedByHugePartition() {
+ return ownedByHugePartition;
+ }
+
+ public void markOwnedByHugePartition() {
+ this.ownedByHugePartition = true;
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 5cf52049..8051669e 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -284,6 +284,13 @@ public class ShuffleServerConf extends RssBaseConf {
.defaultValue(64L * 1024L * 1024L)
.withDescription("For multistorage, the event size exceed this value,
flush data to cold storage");
+ public static final ConfigOption<String> MULTISTORAGE_MANAGER_SELECTOR_CLASS
= ConfigOptions
+ .key("rss.server.multistorage.manager.selector.class")
+ .stringType()
+
.defaultValue("org.apache.uniffle.server.storage.multi.DefaultStorageManagerSelector")
+ .withDescription("For multistorage, the storage manager selector
strategy to support "
+ + "policies of flushing to different storages");
+
public static final ConfigOption<String>
MULTISTORAGE_FALLBACK_STRATEGY_CLASS = ConfigOptions
.key("rss.server.multistorage.fallback.strategy.class")
.stringType()
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 32f56485..e723d22c 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -135,6 +135,10 @@ public class ShuffleTaskManager {
this::triggerFlush, triggerFlushInterval / 2,
triggerFlushInterval, TimeUnit.MILLISECONDS);
}
+ if (shuffleBufferManager != null) {
+ shuffleBufferManager.setShuffleTaskManager(this);
+ }
+
// the thread for clear expired resources
clearResourceThread = () -> {
while (true) {
@@ -204,8 +208,7 @@ public class ShuffleTaskManager {
appId,
shuffleId,
isPreAllocated,
- spd,
- this::getPartitionDataSize
+ spd
);
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 497b3430..a69a28e1 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -41,16 +41,17 @@ import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RssUtils;
-import org.apache.uniffle.common.util.TripleFunction;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleFlushManager;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.apache.uniffle.server.ShuffleTaskManager;
public class ShuffleBufferManager {
private static final Logger LOG =
LoggerFactory.getLogger(ShuffleBufferManager.class);
+ private ShuffleTaskManager shuffleTaskManager;
private final ShuffleFlushManager shuffleFlushManager;
private long capacity;
private long readCapacity;
@@ -103,6 +104,10 @@ public class ShuffleBufferManager {
);
}
+ public void setShuffleTaskManager(ShuffleTaskManager taskManager) {
+ this.shuffleTaskManager = taskManager;
+ }
+
public StatusCode registerBuffer(String appId, int shuffleId, int
startPartition, int endPartition) {
bufferPool.putIfAbsent(appId, Maps.newConcurrentMap());
Map<Integer, RangeMap<Integer, ShuffleBuffer>> shuffleIdToBuffers =
bufferPool.get(appId);
@@ -120,27 +125,11 @@ public class ShuffleBufferManager {
return StatusCode.SUCCESS;
}
- // Only for tests
public StatusCode cacheShuffleData(
String appId,
int shuffleId,
boolean isPreAllocated,
ShufflePartitionedData spd) {
- return cacheShuffleData(
- appId,
- shuffleId,
- isPreAllocated,
- spd,
- null
- );
- }
-
- public StatusCode cacheShuffleData(
- String appId,
- int shuffleId,
- boolean isPreAllocated,
- ShufflePartitionedData spd,
- TripleFunction<String, Integer, Integer, Long> getPartitionDataSizeFunc)
{
if (!isPreAllocated && isFull()) {
LOG.warn("Got unexpected data, can't cache it because the space is
full");
return StatusCode.NO_BUFFER;
@@ -165,8 +154,7 @@ public class ShuffleBufferManager {
shuffleId,
spd.getPartitionId(),
entry.getKey().lowerEndpoint(),
- entry.getKey().upperEndpoint(),
- getPartitionDataSizeFunc
+ entry.getKey().upperEndpoint()
);
flushIfNecessary();
}
@@ -232,19 +220,12 @@ public class ShuffleBufferManager {
int shuffleId,
int partitionId,
int startPartition,
- int endPartition,
- TripleFunction<String, Integer, Integer, Long> getPartitionDataSizeFunc)
{
+ int endPartition) {
+ boolean isHugePartition = isHugePartition(appId, shuffleId, partitionId);
// When we use multi storage and trigger single buffer flush, the buffer
size should be bigger
// than rss.server.flush.cold.storage.threshold.size, otherwise cold
storage will be useless.
- if (this.bufferFlushEnabled && buffer.getSize() >
this.bufferFlushThreshold) {
- flushBuffer(buffer, appId, shuffleId, startPartition, endPartition);
- return;
- }
-
- if (getPartitionDataSizeFunc != null
- && getPartitionDataSizeFunc.accept(appId, shuffleId, partitionId) >
hugePartitionSizeThreshold
- && buffer.getSize() > this.bufferFlushThreshold) {
- flushBuffer(buffer, appId, shuffleId, startPartition, endPartition);
+ if ((isHugePartition || this.bufferFlushEnabled) && buffer.getSize() >
this.bufferFlushThreshold) {
+ flushBuffer(buffer, appId, shuffleId, startPartition, endPartition,
isHugePartition);
return;
}
}
@@ -265,12 +246,19 @@ public class ShuffleBufferManager {
for (Map.Entry<Range<Integer>, ShuffleBuffer> entry :
buffers.asMapOfRanges().entrySet()) {
ShuffleBuffer buffer = entry.getValue();
Range<Integer> range = entry.getKey();
- flushBuffer(buffer, appId, shuffleId, range.lowerEndpoint(),
range.upperEndpoint());
+ flushBuffer(
+ buffer,
+ appId,
+ shuffleId,
+ range.lowerEndpoint(),
+ range.upperEndpoint(),
+ isHugePartition(appId, shuffleId, range.lowerEndpoint())
+ );
}
}
protected void flushBuffer(ShuffleBuffer buffer, String appId,
- int shuffleId, int startPartition, int endPartition) {
+ int shuffleId, int startPartition, int endPartition, boolean
isHugePartition) {
ShuffleDataFlushEvent event =
buffer.toFlushEvent(
appId,
@@ -284,6 +272,9 @@ public class ShuffleBufferManager {
event.addCleanupCallback(() -> releaseMemory(event.getSize(), true,
false));
updateShuffleSize(appId, shuffleId, -event.getSize());
inFlushSize.addAndGet(event.getSize());
+ if (isHugePartition) {
+ event.markOwnedByHugePartition();
+ }
ShuffleServerMetrics.gaugeInFlushBufferSize.set(inFlushSize.get());
shuffleFlushManager.addToFlushQueue(event);
}
@@ -394,8 +385,14 @@ public class ShuffleBufferManager {
for (Map.Entry<Range<Integer>, ShuffleBuffer> rangeEntry :
shuffleIdToBuffers.getValue().asMapOfRanges().entrySet()) {
Range<Integer> range = rangeEntry.getKey();
- flushBuffer(rangeEntry.getValue(), appId, shuffleId,
- range.lowerEndpoint(), range.upperEndpoint());
+ flushBuffer(
+ rangeEntry.getValue(),
+ appId,
+ shuffleId,
+ range.lowerEndpoint(),
+ range.upperEndpoint(),
+ isHugePartition(appId, shuffleId, range.lowerEndpoint())
+ );
}
}
}
@@ -576,6 +573,11 @@ public class ShuffleBufferManager {
}
}
+ boolean isHugePartition(String appId, int shuffleId, int partitionId) {
+ return shuffleTaskManager != null
+ && shuffleTaskManager.getPartitionDataSize(appId, shuffleId,
partitionId) > hugePartitionSizeThreshold;
+ }
+
public boolean isHugePartition(long usedPartitionDataSize) {
return usedPartitionDataSize > hugePartitionSizeThreshold;
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
index ed01de8f..a9c4af78 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
@@ -34,6 +34,7 @@ import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleDataReadEvent;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.event.PurgeEvent;
+import org.apache.uniffle.server.storage.multi.StorageManagerSelector;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
@@ -43,25 +44,53 @@ public class MultiStorageManager implements StorageManager {
private final StorageManager warmStorageManager;
private final StorageManager coldStorageManager;
- private final long flushColdStorageThresholdSize;
- private AbstractStorageManagerFallbackStrategy
storageManagerFallbackStrategy;
private final Cache<ShuffleDataFlushEvent, StorageManager>
eventOfUnderStorageManagers;
+ private final StorageManagerSelector storageManagerSelector;
MultiStorageManager(ShuffleServerConf conf) {
warmStorageManager = new LocalStorageManager(conf);
coldStorageManager = new HdfsStorageManager(conf);
- flushColdStorageThresholdSize =
conf.getSizeAsBytes(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE);
+
try {
- storageManagerFallbackStrategy = loadFallbackStrategy(conf);
+ AbstractStorageManagerFallbackStrategy storageManagerFallbackStrategy =
loadFallbackStrategy(conf);
+ this.storageManagerSelector = loadManagerSelector(
+ conf,
+ storageManagerFallbackStrategy,
+ warmStorageManager,
+ coldStorageManager
+ );
} catch (Exception e) {
- throw new RuntimeException("Load fallback strategy failed.", e);
+ throw new RuntimeException("Errors on loading selector manager.", e);
}
+
long cacheTimeout =
conf.getLong(ShuffleServerConf.STORAGEMANAGER_CACHE_TIMEOUT);
eventOfUnderStorageManagers = CacheBuilder.newBuilder()
.expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS)
.build();
}
+ private StorageManagerSelector loadManagerSelector(
+ ShuffleServerConf conf,
+ AbstractStorageManagerFallbackStrategy storageManagerFallbackStrategy,
+ StorageManager warmStorageManager,
+ StorageManager coldStorageManager) throws Exception {
+ String name =
conf.get(ShuffleServerConf.MULTISTORAGE_MANAGER_SELECTOR_CLASS);
+ Class<?> klass = Class.forName(name);
+ Constructor<?> constructor = klass.getConstructor(
+ StorageManager.class,
+ StorageManager.class,
+ AbstractStorageManagerFallbackStrategy.class,
+ conf.getClass()
+ );
+ StorageManagerSelector instance = (StorageManagerSelector)
constructor.newInstance(
+ warmStorageManager,
+ coldStorageManager,
+ storageManagerFallbackStrategy,
+ conf
+ );
+ return instance;
+ }
+
public static AbstractStorageManagerFallbackStrategy loadFallbackStrategy(
ShuffleServerConf conf) throws Exception {
String name =
conf.getString(ShuffleServerConf.MULTISTORAGE_FALLBACK_STRATEGY_CLASS,
@@ -100,17 +129,7 @@ public class MultiStorageManager implements StorageManager
{
}
private StorageManager selectStorageManager(ShuffleDataFlushEvent event) {
- StorageManager storageManager;
- if (event.getSize() > flushColdStorageThresholdSize) {
- storageManager = coldStorageManager;
- } else {
- storageManager = warmStorageManager;
- }
-
- if (!storageManager.canWrite(event) || event.getRetryTimes() > 0) {
- storageManager = storageManagerFallbackStrategy.tryFallback(
- storageManager, event, warmStorageManager, coldStorageManager);
- }
+ StorageManager storageManager = storageManagerSelector.select(event);
eventOfUnderStorageManagers.put(event, storageManager);
event.addCleanupCallback(() ->
eventOfUnderStorageManagers.invalidate(event));
return storageManager;
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/multi/DefaultStorageManagerSelector.java
b/server/src/main/java/org/apache/uniffle/server/storage/multi/DefaultStorageManagerSelector.java
new file mode 100644
index 00000000..50f49823
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/storage/multi/DefaultStorageManagerSelector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage.multi;
+
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.ShuffleServerConf;
+import
org.apache.uniffle.server.storage.AbstractStorageManagerFallbackStrategy;
+import org.apache.uniffle.server.storage.StorageManager;
+
+import static
org.apache.uniffle.server.ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE;
+
+public class DefaultStorageManagerSelector extends
FallbackBasedStorageManagerSelector {
+ private final long flushColdStorageThresholdSize;
+
+ public DefaultStorageManagerSelector(
+ StorageManager warmStorageManager,
+ StorageManager coldStorageManager,
+ AbstractStorageManagerFallbackStrategy fallbackStrategy,
+ ShuffleServerConf rssConf) {
+ super(warmStorageManager, coldStorageManager, fallbackStrategy);
+ this.flushColdStorageThresholdSize =
rssConf.get(FLUSH_COLD_STORAGE_THRESHOLD_SIZE);
+ }
+
+ @Override
+ StorageManager regularSelect(ShuffleDataFlushEvent flushEvent) {
+ StorageManager storageManager = warmStorageManager;
+ if (flushEvent.getSize() > flushColdStorageThresholdSize) {
+ storageManager = coldStorageManager;
+ }
+ return storageManager;
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/multi/FallbackBasedStorageManagerSelector.java
b/server/src/main/java/org/apache/uniffle/server/storage/multi/FallbackBasedStorageManagerSelector.java
new file mode 100644
index 00000000..50443091
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/storage/multi/FallbackBasedStorageManagerSelector.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage.multi;
+
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import
org.apache.uniffle.server.storage.AbstractStorageManagerFallbackStrategy;
+import org.apache.uniffle.server.storage.StorageManager;
+
+public abstract class FallbackBasedStorageManagerSelector implements
StorageManagerSelector {
+ protected final StorageManager warmStorageManager;
+ protected final StorageManager coldStorageManager;
+ private final AbstractStorageManagerFallbackStrategy fallbackStrategy;
+
+ public FallbackBasedStorageManagerSelector(
+ StorageManager warmStorageManager,
+ StorageManager coldStorageManager,
+ AbstractStorageManagerFallbackStrategy fallbackStrategy) {
+ this.warmStorageManager = warmStorageManager;
+ this.coldStorageManager = coldStorageManager;
+ this.fallbackStrategy = fallbackStrategy;
+ }
+
+ abstract StorageManager regularSelect(ShuffleDataFlushEvent flushEvent);
+
+ private StorageManager fallbackSelect(ShuffleDataFlushEvent flushEvent,
StorageManager candidateStorageManager) {
+ return fallbackStrategy.tryFallback(
+ candidateStorageManager,
+ flushEvent,
+ warmStorageManager,
+ coldStorageManager
+ );
+ }
+
+ @Override
+ public StorageManager select(ShuffleDataFlushEvent flushEvent) {
+ StorageManager storageManager = regularSelect(flushEvent);
+ if (!storageManager.canWrite(flushEvent) || flushEvent.getRetryTimes() >
0) {
+ storageManager = fallbackSelect(flushEvent, storageManager);
+ }
+ return storageManager;
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/multi/HugePartitionSensitiveStorageManagerSelector.java
b/server/src/main/java/org/apache/uniffle/server/storage/multi/HugePartitionSensitiveStorageManagerSelector.java
new file mode 100644
index 00000000..808d5d47
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/storage/multi/HugePartitionSensitiveStorageManagerSelector.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage.multi;
+
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.ShuffleServerConf;
+import
org.apache.uniffle.server.storage.AbstractStorageManagerFallbackStrategy;
+import org.apache.uniffle.server.storage.StorageManager;
+
+public class HugePartitionSensitiveStorageManagerSelector extends
FallbackBasedStorageManagerSelector {
+
+ public HugePartitionSensitiveStorageManagerSelector(
+ StorageManager warmStorageManager,
+ StorageManager coldStorageManager,
+ AbstractStorageManagerFallbackStrategy fallbackStrategy,
+ ShuffleServerConf rssConf) {
+ super(warmStorageManager, coldStorageManager, fallbackStrategy);
+ }
+
+ @Override
+ StorageManager regularSelect(ShuffleDataFlushEvent flushEvent) {
+ StorageManager storageManager = warmStorageManager;
+ if (flushEvent.isOwnedByHugePartition()) {
+ storageManager = coldStorageManager;
+ }
+ return storageManager;
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/multi/StorageManagerSelector.java
b/server/src/main/java/org/apache/uniffle/server/storage/multi/StorageManagerSelector.java
new file mode 100644
index 00000000..55cbd509
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/storage/multi/StorageManagerSelector.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage.multi;
+
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.storage.StorageManager;
+
+public interface StorageManagerSelector {
+
+ StorageManager select(ShuffleDataFlushEvent flushEvent);
+
+ enum ColdStoragePreferredFactor {
+ HUGE_EVENT,
+ OWNED_BY_HUGE_PARTITION,
+ }
+}
diff --git
a/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
index d385e933..3cea6d32 100644
---
a/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
@@ -56,6 +56,44 @@ public class MultiStorageManagerTest {
assertTrue((manager.selectStorage(event) instanceof HdfsStorage));
}
+ @Test
+ public void testStorageManagerSelectorOfPreferCold() {
+ ShuffleServerConf conf = new ShuffleServerConf();
+ conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 10000L);
+ conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("test"));
+ conf.setLong(ShuffleServerConf.DISK_CAPACITY, 10000L);
+ conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE,
StorageType.MEMORY_LOCALFILE_HDFS.name());
+ conf.setString(ShuffleServerConf.MULTISTORAGE_FALLBACK_STRATEGY_CLASS,
+ RotateStorageManagerFallbackStrategy.class.getCanonicalName());
+ conf.set(
+ ShuffleServerConf.MULTISTORAGE_MANAGER_SELECTOR_CLASS,
+
"org.apache.uniffle.server.storage.multi.HugePartitionSensitiveStorageManagerSelector"
+ );
+ MultiStorageManager manager = new MultiStorageManager(conf);
+ String remoteStorage = "test";
+ String appId = "selectStorageManagerIfCanNotWriteTest_appId";
+ manager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
+
+ /**
+ * case1: only event owned by huge partition will be flushed to cold
storage
+ * when the
+ * {@link
org.apache.uniffle.server.storage.multi.StorageManagerSelector.ColdStoragePreferredFactor.HUGE_PARTITION}
+ * is enabled.
+ */
+ List<ShufflePartitionedBlock> blocks = Lists.newArrayList(
+ new ShufflePartitionedBlock(10001, 1000, 1, 1, 1L, null)
+ );
+ ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(
+ 1, appId, 1, 1, 1, 100000, blocks, null, null);
+ Storage storage = manager.selectStorage(event);
+ assertTrue(storage instanceof LocalStorage);
+
+ ShuffleDataFlushEvent event1 = new ShuffleDataFlushEvent(1, appId, 1, 1,
1, 10, blocks, null, null);
+ event1.markOwnedByHugePartition();
+ storage = manager.selectStorage(event1);
+ assertTrue(storage instanceof HdfsStorage);
+ }
+
@Test
public void underStorageManagerSelectionTest() {
ShuffleServerConf conf = new ShuffleServerConf();