waitinfuture commented on code in PR #990:
URL: 
https://github.com/apache/incubator-celeborn/pull/990#discussion_r1033621234


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -249,6 +285,98 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
         batchHandleChangePartitionRequestInterval,
         TimeUnit.MILLISECONDS)
     }
+
+    batchHandleCommitPartitionSchedulerThread.foreach {
+      _.scheduleAtFixedRate(
+        new Runnable {
+          override def run(): Unit = {
+            committedPartitionInfo.asScala.foreach { case (shuffleId, 
shuffleCommittedInfo) =>
+              batchHandleCommitPartitionExecutors.submit {
+                new Runnable {
+                  override def run(): Unit = {
+                    if (inProcessStageEndShuffleSet.contains(shuffleId) ||
+                      stageEndShuffleSet.contains(shuffleId)) {
+                      logWarning(s"Shuffle $shuffleId ended or during 
processing stage end.")
+                      shuffleCommittedInfo.commitPartitionRequests.clear()

Review Comment:
   Should put this line inside shuffleCommittedInfo.synchronize



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -1178,6 +1265,94 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
       ReleaseSlots(applicationId, shuffleId, List.empty.asJava, 
List.empty.asJava))
   }
 
+  private def commitFiles(
+      applicationId: String,
+      shuffleId: Int,
+      shuffleCommittedInfo: ShuffleCommittedInfo,
+      worker: WorkerInfo,
+      masterIds: util.List[String],
+      slaveIds: util.List[String],
+      commitFilesFailedWorkers: ConcurrentHashMap[WorkerInfo, (StatusCode, 
Long)]): Unit = {
+
+    val res =
+      if (!testRetryCommitFiles) {
+        val commitFiles = CommitFiles(
+          applicationId,
+          shuffleId,
+          masterIds,
+          slaveIds,
+          shuffleMapperAttempts.get(shuffleId),
+          commitEpoch.incrementAndGet())
+        shuffleCommittedInfo.inFlightCommitRequest.incrementAndGet()
+        val res = requestCommitFilesWithRetry(worker.endpoint, commitFiles)
+        shuffleCommittedInfo.inFlightCommitRequest.decrementAndGet()
+
+        res.status match {
+          case StatusCode.SUCCESS => // do nothing
+          case StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_NOT_REGISTERED 
| StatusCode.FAILED =>
+            logDebug(s"Request $commitFiles return ${res.status} for " +
+              s"${Utils.makeShuffleKey(applicationId, shuffleId)}")
+            commitFilesFailedWorkers.put(worker, (res.status, 
System.currentTimeMillis()))
+          case _ => // won't happen
+        }
+        res
+      } else {
+        // for test
+        val commitFiles1 = CommitFiles(
+          applicationId,
+          shuffleId,
+          masterIds.subList(0, masterIds.size() / 2),
+          slaveIds.subList(0, slaveIds.size() / 2),
+          shuffleMapperAttempts.get(shuffleId),
+          commitEpoch.incrementAndGet())
+        val res1 = requestCommitFilesWithRetry(worker.endpoint, commitFiles1)
+
+        val commitFiles = CommitFiles(
+          applicationId,
+          shuffleId,
+          masterIds.subList(masterIds.size() / 2, masterIds.size()),
+          slaveIds.subList(slaveIds.size() / 2, slaveIds.size()),
+          shuffleMapperAttempts.get(shuffleId),
+          commitEpoch.incrementAndGet())
+        val res2 = requestCommitFilesWithRetry(worker.endpoint, commitFiles)
+
+        
res1.committedMasterStorageInfos.putAll(res2.committedMasterStorageInfos)
+        res1.committedSlaveStorageInfos.putAll(res2.committedSlaveStorageInfos)
+        res1.committedMapIdBitMap.putAll(res2.committedMapIdBitMap)
+        CommitFilesResponse(
+          status = if (res1.status == StatusCode.SUCCESS) res2.status else 
res1.status,
+          (res1.committedMasterIds.asScala ++ 
res2.committedMasterIds.asScala).toList.asJava,
+          (res1.committedSlaveIds.asScala ++ 
res1.committedSlaveIds.asScala).toList.asJava,
+          (res1.failedMasterIds.asScala ++ 
res1.failedMasterIds.asScala).toList.asJava,
+          (res1.failedSlaveIds.asScala ++ 
res2.failedSlaveIds.asScala).toList.asJava,
+          res1.committedMasterStorageInfos,
+          res1.committedSlaveStorageInfos,
+          res1.committedMapIdBitMap,
+          res1.totalWritten + res2.totalWritten,
+          res1.fileCount + res2.fileCount)
+      }
+
+    // record committed partitionIds
+    shuffleCommittedInfo.committedMasterIds.addAll(res.committedMasterIds)

Review Comment:
   ditto



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -249,6 +285,98 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
         batchHandleChangePartitionRequestInterval,
         TimeUnit.MILLISECONDS)
     }
+
+    batchHandleCommitPartitionSchedulerThread.foreach {
+      _.scheduleAtFixedRate(
+        new Runnable {
+          override def run(): Unit = {
+            committedPartitionInfo.asScala.foreach { case (shuffleId, 
shuffleCommittedInfo) =>
+              batchHandleCommitPartitionExecutors.submit {
+                new Runnable {
+                  override def run(): Unit = {
+                    if (inProcessStageEndShuffleSet.contains(shuffleId) ||
+                      stageEndShuffleSet.contains(shuffleId)) {
+                      logWarning(s"Shuffle $shuffleId ended or during 
processing stage end.")
+                      shuffleCommittedInfo.commitPartitionRequests.clear()
+                    } else {
+                      val currentBatch = shuffleCommittedInfo.synchronized {
+                        val batch = 
ConcurrentHashMap.newKeySet[CommitPartitionRequest]()
+                        
batch.addAll(shuffleCommittedInfo.commitPartitionRequests)
+                        val currentBatch = batch.asScala.filterNot { request =>
+                          
shuffleCommittedInfo.handledCommitPartitionRequests.contains(
+                            request.partition)
+                        }
+                        
shuffleCommittedInfo.commitPartitionRequests.removeAll(batch)

Review Comment:
   shuffleCommittedInfo.commitPartitionRequests.clear()



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -692,6 +834,13 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
     // check if there exists request for the partition, if do just register
     val requests = changePartitionRequests.computeIfAbsent(shuffleId, 
rpcContextRegisterFunc)
     inBatchPartitions.computeIfAbsent(shuffleId, inBatchShuffleIdRegisterFunc)
+
+    // handle hard split
+    if (batchHandleCommitPartitionEnabled && cause.isDefined && cause.get == 
StatusCode.HARD_SPLIT) {
+      committedPartitionInfo.get(shuffleId).commitPartitionRequests

Review Comment:
   I think we should put all code referring ShuffleCommittedInfo inside 
ShuffleCommittedInfo.synchronize{}



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -986,108 +1127,49 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
           slavePartMap.put(partition.getUniqueId, partition)
         }
 
-        val masterIds = masterParts.asScala.map(_.getUniqueId).asJava
-        val slaveIds = slaveParts.asScala.map(_.getUniqueId).asJava
-
-        val res =
-          if (!testRetryCommitFiles) {
-            val commitFiles = CommitFiles(
-              applicationId,
-              shuffleId,
-              masterIds,
-              slaveIds,
-              shuffleMapperAttempts.get(shuffleId),
-              commitEpoch.incrementAndGet())
-            val res = requestCommitFilesWithRetry(worker.endpoint, commitFiles)
-
-            res.status match {
-              case StatusCode.SUCCESS => // do nothing
-              case StatusCode.PARTIAL_SUCCESS | 
StatusCode.SHUFFLE_NOT_REGISTERED | StatusCode.FAILED =>
-                logDebug(s"Request $commitFiles return ${res.status} for " +
-                  s"${Utils.makeShuffleKey(applicationId, shuffleId)}")
-                commitFilesFailedWorkers.put(worker, (res.status, 
System.currentTimeMillis()))
-              case _ => // won't happen
-            }
-            res
-          } else {
-            // for test
-            val commitFiles1 = CommitFiles(
-              applicationId,
-              shuffleId,
-              masterIds.subList(0, masterIds.size() / 2),
-              slaveIds.subList(0, slaveIds.size() / 2),
-              shuffleMapperAttempts.get(shuffleId),
-              commitEpoch.incrementAndGet())
-            val res1 = requestCommitFilesWithRetry(worker.endpoint, 
commitFiles1)
-
-            val commitFiles = CommitFiles(
-              applicationId,
-              shuffleId,
-              masterIds.subList(masterIds.size() / 2, masterIds.size()),
-              slaveIds.subList(slaveIds.size() / 2, slaveIds.size()),
-              shuffleMapperAttempts.get(shuffleId),
-              commitEpoch.incrementAndGet())
-            val res2 = requestCommitFilesWithRetry(worker.endpoint, 
commitFiles)
-
-            
res1.committedMasterStorageInfos.putAll(res2.committedMasterStorageInfos)
-            
res1.committedSlaveStorageInfos.putAll(res2.committedSlaveStorageInfos)
-            res1.committedMapIdBitMap.putAll(res2.committedMapIdBitMap)
-            CommitFilesResponse(
-              status = if (res1.status == StatusCode.SUCCESS) res2.status else 
res1.status,
-              (res1.committedMasterIds.asScala ++ 
res2.committedMasterIds.asScala).toList.asJava,
-              (res1.committedSlaveIds.asScala ++ 
res1.committedSlaveIds.asScala).toList.asJava,
-              (res1.failedMasterIds.asScala ++ 
res1.failedMasterIds.asScala).toList.asJava,
-              (res1.failedSlaveIds.asScala ++ 
res2.failedSlaveIds.asScala).toList.asJava,
-              res1.committedMasterStorageInfos,
-              res1.committedSlaveStorageInfos,
-              res1.committedMapIdBitMap,
-              res1.totalWritten + res2.totalWritten,
-              res1.fileCount + res2.fileCount)
-          }
-
-        // record committed partitionIds
-        committedMasterIds.addAll(res.committedMasterIds)
-        committedSlaveIds.addAll(res.committedSlaveIds)
-
-        // record committed partitions storage hint and disk hint
-        committedMasterStorageInfos.putAll(res.committedMasterStorageInfos)
-        committedSlaveStorageInfos.putAll(res.committedSlaveStorageInfos)
-
-        // record failed partitions
-        failedMasterPartitionIds.putAll(res.failedMasterIds.asScala.map((_, 
worker)).toMap.asJava)
-        failedSlavePartitionIds.putAll(res.failedSlaveIds.asScala.map((_, 
worker)).toMap.asJava)
-
-        if (!res.committedMapIdBitMap.isEmpty) {
-          committedMapIdBitmap.putAll(res.committedMapIdBitMap)
-        }
+        val masterIds = masterParts.asScala

Review Comment:
   ditto



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -249,6 +285,98 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
         batchHandleChangePartitionRequestInterval,
         TimeUnit.MILLISECONDS)
     }
+
+    batchHandleCommitPartitionSchedulerThread.foreach {
+      _.scheduleAtFixedRate(
+        new Runnable {
+          override def run(): Unit = {
+            committedPartitionInfo.asScala.foreach { case (shuffleId, 
shuffleCommittedInfo) =>
+              batchHandleCommitPartitionExecutors.submit {
+                new Runnable {
+                  override def run(): Unit = {
+                    if (inProcessStageEndShuffleSet.contains(shuffleId) ||
+                      stageEndShuffleSet.contains(shuffleId)) {
+                      logWarning(s"Shuffle $shuffleId ended or during 
processing stage end.")
+                      shuffleCommittedInfo.commitPartitionRequests.clear()
+                    } else {
+                      val currentBatch = shuffleCommittedInfo.synchronized {
+                        val batch = 
ConcurrentHashMap.newKeySet[CommitPartitionRequest]()

Review Comment:
   Just use HashSet



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to