FMX commented on code in PR #973:
URL:
https://github.com/apache/incubator-celeborn/pull/973#discussion_r1023470930
##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -257,13 +259,58 @@ private String genAddressPair(PartitionLocation loc) {
private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
String appId, int shuffleId, int numMappers, int numPartitions) {
+ return registerShuffle(
+ shuffleId,
+ numMappers,
+ numMappers,
+ () ->
+ driverRssMetaService.askSync(
+ RegisterShuffle$.MODULE$.apply(appId, shuffleId, numMappers,
numPartitions),
+ ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
+ }
+
+ @Override
+ public PartitionLocation registerMapPartitionTask(
+ String appId, int shuffleId, int numMappers, int mapId, int attemptId) {
+ int partitionId = PackedPartitionId.packedPartitionId(mapId, attemptId);
+ logger.info(
+ "register mapPartitionTask, mapId: {}, attemptId: {}, partitionId: {}",
+ mapId,
+ attemptId,
+ partitionId);
+ if (attemptId == 0) {
+ return registerMapPartitionTaskWithFirstAttempt(
+ appId, shuffleId, numMappers, mapId, attemptId, partitionId);
+ }
+
+ // TODO
+ throw new UnsupportedOperationException("can not register shuffle task
with attempt beyond 0");
+ }
+
+ private PartitionLocation registerMapPartitionTaskWithFirstAttempt(
+ String appId, int shuffleId, int numMappers, int mapId, int attemptId,
int partitionId) {
+ ConcurrentHashMap<Integer, PartitionLocation> partitionLocationMap =
+ registerShuffle(
+ shuffleId,
+ numMappers,
+ numMappers,
+ () ->
+ driverRssMetaService.askSync(
+ RegisterMapPartitionTask$.MODULE$.apply(
+ appId, shuffleId, numMappers, mapId, attemptId,
partitionId),
+ ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
+ return partitionLocationMap.get(partitionId);
+ }
+
+ public ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
+ int shuffleId,
+ int numMappers,
+ int numPartitions,
+ Callable<PbRegisterShuffleResponse> callable) {
Review Comment:
I think this might be replaced by PbRegisterShuffleResponse. There is no
need to add a Callable object here.
##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -257,13 +259,58 @@ private String genAddressPair(PartitionLocation loc) {
private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
String appId, int shuffleId, int numMappers, int numPartitions) {
+ return registerShuffle(
+ shuffleId,
+ numMappers,
+ numMappers,
+ () ->
+ driverRssMetaService.askSync(
+ RegisterShuffle$.MODULE$.apply(appId, shuffleId, numMappers,
numPartitions),
+ ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
+ }
+
+ @Override
+ public PartitionLocation registerMapPartitionTask(
+ String appId, int shuffleId, int numMappers, int mapId, int attemptId) {
+ int partitionId = PackedPartitionId.packedPartitionId(mapId, attemptId);
+ logger.info(
+ "register mapPartitionTask, mapId: {}, attemptId: {}, partitionId: {}",
+ mapId,
+ attemptId,
+ partitionId);
+ if (attemptId == 0) {
+ return registerMapPartitionTaskWithFirstAttempt(
+ appId, shuffleId, numMappers, mapId, attemptId, partitionId);
+ }
+
+ // TODO
+ throw new UnsupportedOperationException("can not register shuffle task
with attempt beyond 0");
+ }
+
+ private PartitionLocation registerMapPartitionTaskWithFirstAttempt(
+ String appId, int shuffleId, int numMappers, int mapId, int attemptId,
int partitionId) {
+ ConcurrentHashMap<Integer, PartitionLocation> partitionLocationMap =
+ registerShuffle(
+ shuffleId,
+ numMappers,
+ numMappers,
+ () ->
+ driverRssMetaService.askSync(
+ RegisterMapPartitionTask$.MODULE$.apply(
+ appId, shuffleId, numMappers, mapId, attemptId,
partitionId),
+ ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
+ return partitionLocationMap.get(partitionId);
+ }
+
+ public ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
Review Comment:
Maybe registerShuffleInternal ? Register shuffle methods become confusing.
##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -257,13 +259,58 @@ private String genAddressPair(PartitionLocation loc) {
private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
String appId, int shuffleId, int numMappers, int numPartitions) {
+ return registerShuffle(
+ shuffleId,
+ numMappers,
+ numMappers,
+ () ->
+ driverRssMetaService.askSync(
+ RegisterShuffle$.MODULE$.apply(appId, shuffleId, numMappers,
numPartitions),
+ ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
+ }
+
+ @Override
+ public PartitionLocation registerMapPartitionTask(
+ String appId, int shuffleId, int numMappers, int mapId, int attemptId) {
+ int partitionId = PackedPartitionId.packedPartitionId(mapId, attemptId);
+ logger.info(
+ "register mapPartitionTask, mapId: {}, attemptId: {}, partitionId: {}",
+ mapId,
+ attemptId,
+ partitionId);
+ if (attemptId == 0) {
+ return registerMapPartitionTaskWithFirstAttempt(
+ appId, shuffleId, numMappers, mapId, attemptId, partitionId);
+ }
+
+ // TODO
+ throw new UnsupportedOperationException("can not register shuffle task
with attempt beyond 0");
+ }
+
+ private PartitionLocation registerMapPartitionTaskWithFirstAttempt(
Review Comment:
Can it be merged into registerMapPartitionTask by adding some parameter?
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -379,6 +403,34 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
shuffleId: Int,
numMappers: Int,
numReducers: Int): Unit = {
+ handleOfferAndReserveSlots(context, applicationId, shuffleId, numMappers,
numReducers)
+ }
+
+ private def handleRegisterMapPartitionTask(
+ context: RpcCallContext,
+ applicationId: String,
+ shuffleId: Int,
+ numMappers: Int,
+ attemptId: Int,
Review Comment:
Unused. Maybe it can be delete.
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -53,7 +54,8 @@ class LifecycleManager(appId: String, val conf: CelebornConf)
extends RpcEndpoin
private val pushReplicateEnabled = conf.pushReplicateEnabled
private val partitionSplitThreshold = conf.partitionSplitThreshold
private val partitionSplitMode = conf.partitionSplitMode
- private val partitionType = conf.shufflePartitionType
+ // shuffle id -> partition type
+ private val shufflePartitionType = new ConcurrentHashMap[Int,
PartitionType]()
Review Comment:
This is for the future? Will an application have both map partition and
reduce partition?
##########
client/src/main/java/org/apache/celeborn/client/ShuffleClient.java:
##########
@@ -110,6 +111,9 @@ public static FileSystem getHdfsFs(CelebornConf conf) {
return hdfsFs;
}
+ public abstract PartitionLocation registerMapPartitionTask(
Review Comment:
Looks like there is no place using the shuffle client instance directly, why
should this method be added here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]