waitinfuture commented on code in PR #990:
URL: 
https://github.com/apache/incubator-celeborn/pull/990#discussion_r1032874711


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -669,6 +799,12 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
     override def apply(s: Int): util.Set[Integer] = new util.HashSet[Integer]()
   }
 
+  private val commitPartitionRegisterFunc =
+    new util.function.Function[Int, util.Set[CommitPartitionRequest]]() {
+      override def apply(s: Int): util.Set[CommitPartitionRequest] =
+        new util.HashSet[CommitPartitionRequest]()

Review Comment:
   ditto



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -124,6 +124,29 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
   // shuffleId -> set of partition id
   private val inBatchPartitions = new ConcurrentHashMap[Int, 
util.Set[Integer]]()
 
+  case class CommitPartitionRequest(
+      applicationId: String,
+      shuffleId: Int,
+      oldPartition: PartitionLocation)
+
+  case class ShuffleCommittedInfo(
+      committedMasterIds: util.List[String],
+      committedSlaveIds: util.List[String],
+      failedMasterPartitionIds: ConcurrentHashMap[String, WorkerInfo],
+      failedSlavePartitionIds: ConcurrentHashMap[String, WorkerInfo],
+      committedMasterStorageInfos: ConcurrentHashMap[String, StorageInfo],
+      committedSlaveStorageInfos: ConcurrentHashMap[String, StorageInfo],
+      committedMapIdBitmap: ConcurrentHashMap[String, RoaringBitmap],
+      currentShuffleFileCount: LongAdder)
+
+  private val commitPartitionRequests =
+    new ConcurrentHashMap[Int, util.Set[CommitPartitionRequest]]()

Review Comment:
   I think we should use concurrentSet here



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -246,6 +283,88 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
         batchHandleChangePartitionRequestInterval,
         TimeUnit.MILLISECONDS)
     }
+
+    batchHandleCommitPartitionSchedulerThread.foreach {
+      _.scheduleAtFixedRate(
+        new Runnable {
+          override def run(): Unit = {
+            logWarning("Batch commit hard split partition")
+            commitPartitionRequests.asScala.foreach { case (shuffleId, 
requests) =>
+              batchHandleCommitPartitionExecutors.submit {
+                new Runnable {
+                  override def run(): Unit = {
+                    val shuffleCommittedInfo = 
committedPartitionInfo.get(shuffleId)

Review Comment:
   Agree to do the refactor in following pr



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -124,6 +124,29 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
   // shuffleId -> set of partition id
   private val inBatchPartitions = new ConcurrentHashMap[Int, 
util.Set[Integer]]()
 
+  case class CommitPartitionRequest(
+      applicationId: String,
+      shuffleId: Int,
+      oldPartition: PartitionLocation)
+
+  case class ShuffleCommittedInfo(
+      committedMasterIds: util.List[String],
+      committedSlaveIds: util.List[String],
+      failedMasterPartitionIds: ConcurrentHashMap[String, WorkerInfo],
+      failedSlavePartitionIds: ConcurrentHashMap[String, WorkerInfo],
+      committedMasterStorageInfos: ConcurrentHashMap[String, StorageInfo],
+      committedSlaveStorageInfos: ConcurrentHashMap[String, StorageInfo],
+      committedMapIdBitmap: ConcurrentHashMap[String, RoaringBitmap],
+      currentShuffleFileCount: LongAdder)
+
+  private val commitPartitionRequests =
+    new ConcurrentHashMap[Int, util.Set[CommitPartitionRequest]]()
+  private val inBatchCommitPartitionRequests =
+    new ConcurrentHashMap[Int, util.Set[CommitPartitionRequest]]()

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to