zuston commented on issue #142:
URL:
https://github.com/apache/incubator-uniffle/issues/142#issuecomment-1208955697
I have a question that why it will influence the dynamic allocation? Could u
help give more detail?
I think a shuffle dependency will hold a shuffle handle(rss or sort handle)
which is decided by the method of registerShuffle. This means that a shuffle
only will be managed by one shuffle manager(rss or sort) and served for writer
or reader. The co-operation dont exist in sort and rss.
> SortShuffleManager need ESS to support dynamic allocation, but
RssShuffleManager can't work with ESS.
I dont understand what's the meaning.
Attach the code draft implementation.
### Code Draft
```java
public class DelegationRssShuffleManager implements ShuffleManager {
private ShuffleManager rssShuffleManager;
private ShuffleManager sortShuffleManager;
public DelegationRssShuffleManager(SparkConf sparkConf, boolean isDriver)
throws Exception {
this.sparkConf = sparkConf;
// dont initialize the shuffle manager in the constructor.
}
private synchronized ShuffleManager createOrGetShuffleManager(boolean
useRss) {
if (useRss) {
if (rssShuffleManager != null) {
return rssShuffleManager;
} else {
try {
final ShuffleManager shuffleManager = new
RssShuffleManager(sparkConf, true);
sparkConf.set(RssSparkConfig.RSS_ENABLED.key(), "true");
LOG.info("Use RssShuffleManager");
this.rssShuffleManager = shuffleManager;
return rssShuffleManager;
} catch (Exception exception) {
LOG.warn("Fail to create RssShuffleManager, fallback to
SortShuffleManager {}", exception.getMessage());
}
}
}
if (sortShuffleManager == null) {
try {
final ShuffleManager shuffleManager =
RssSparkShuffleUtils.loadShuffleManager(Constants.SORT_SHUFFLE_MANAGER_NAME,
sparkConf, true);
LOG.info("Use SortShuffleManager");
this.sortShuffleManager = shuffleManager;
} catch (Exception e) {
throw new RssException(e.getMessage());
}
}
return sortShuffleManager;
}
@Override
public <K, V, C> ShuffleHandle registerShuffle(int shuffleId,
ShuffleDependency<K, V, C> dependency) {
ShuffleManager shuffleManager =
createOrGetShuffleManager(tryAccessCluster());
if (!(shuffleManager instanceof RssShuffleManager)) {
shuffleIdsUsingSort.add(shuffleId);
}
return shuffleManager.registerShuffle(shuffleId, dependency);
}
@Override
public <K, V> ShuffleWriter<K, V> getWriter(
ShuffleHandle handle,
long mapId,
TaskContext context,
ShuffleWriteMetricsReporter metrics) {
ShuffleManager shuffleManager = createOrGetShuffleManager(handle
instanceof RssShuffleHandle);
return shuffleManager.getWriter(handle, mapId, context, metrics);
}
@Override
public <K, C> ShuffleReader<K, C> getReader(
ShuffleHandle handle,
int startPartition,
int endPartition,
TaskContext context,
ShuffleReadMetricsReporter metrics) {
ShuffleManager shuffleManager = createOrGetShuffleManager(handle
instanceof RssShuffleHandle);
return shuffleManager.getReader(handle,
startPartition, endPartition, context, metrics);
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]