Copilot commented on code in PR #3387:
URL: https://github.com/apache/celeborn/pull/3387#discussion_r2261800104


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:
##########
@@ -299,6 +258,109 @@ private[deploy] class Controller(
     context.reply(ReserveSlotsResponse(StatusCode.SUCCESS))
   }
 
+  private def createWriters(
+      shuffleKey: String,
+      applicationId: String,
+      shuffleId: Int,
+      requestLocs: jList[PartitionLocation],
+      splitThreshold: Long,
+      splitMode: PartitionSplitMode,
+      partitionType: PartitionType,
+      rangeReadFilter: Boolean,
+      userIdentifier: UserIdentifier,
+      partitionSplitEnabled: Boolean,
+      isSegmentGranularityVisible: Boolean,
+      isPrimary: Boolean): jList[PartitionLocation] = {
+    val partitionLocations = new jArrayList[PartitionLocation]()
+    try {
+      def createWriter(partitionLocation: PartitionLocation): 
PartitionLocation = {
+        createPartitionDataWriter(
+          shuffleKey,
+          applicationId,
+          shuffleId,
+          partitionLocation,
+          splitThreshold,
+          splitMode,
+          partitionType,
+          rangeReadFilter,
+          userIdentifier,
+          partitionSplitEnabled,
+          isSegmentGranularityVisible,
+          isPrimary)
+      }
+      if (createWriterThreadPool == null) {
+        partitionLocations.addAll(requestLocs.asScala.map(createWriter).asJava)
+      } else {
+        partitionLocations.addAll(requestLocs.asScala.map(requestLoc =>
+          createWriterThreadPool.submit(new Callable[PartitionLocation] {
+            override def call(): PartitionLocation = createWriter(requestLoc)
+          })).map(_.get()).asJava)
+      }
+    } catch {
+      case e: Exception =>
+        logError(s"Create FileWriter for $shuffleKey failed.", e)
+    }
+    partitionLocations
+  }
+
+  private def createPartitionDataWriter(
+      shuffleKey: String,
+      applicationId: String,
+      shuffleId: Int,
+      requestLoc: PartitionLocation,
+      splitThreshold: Long,
+      splitMode: PartitionSplitMode,
+      partitionType: PartitionType,
+      rangeReadFilter: Boolean,
+      userIdentifier: UserIdentifier,
+      partitionSplitEnabled: Boolean,
+      isSegmentGranularityVisible: Boolean,
+      isPrimary: Boolean): PartitionLocation = {
+    try {
+      var location =
+        if (isPrimary) {
+          partitionLocationInfo.getPrimaryLocation(
+            shuffleKey,
+            requestLoc.getUniqueId)
+        } else {
+          partitionLocationInfo.getReplicaLocation(
+            shuffleKey,
+            requestLoc.getUniqueId)
+        }
+      if (location == null) {
+        location = requestLoc
+        val writer = storageManager.createPartitionDataWriter(
+          applicationId,
+          shuffleId,
+          location,
+          splitThreshold,
+          splitMode,
+          partitionType,
+          rangeReadFilter,
+          userIdentifier,
+          partitionSplitEnabled,
+          isSegmentGranularityVisible)
+        new WorkingPartition(location, writer)
+      } else {
+        location
+      }
+    } catch {
+      case e: Exception =>
+        logError(s"Create FileWriter for $requestLoc $shuffleKey failed.", e)
+        throw e

Review Comment:
   [nitpick] Re-throwing the exception after logging it provides no additional 
value since the exception will be propagated anyway. Consider removing the 
try-catch block or handling the exception more meaningfully.
   ```suggestion
       var location =
         if (isPrimary) {
           partitionLocationInfo.getPrimaryLocation(
             shuffleKey,
             requestLoc.getUniqueId)
         } else {
           partitionLocationInfo.getReplicaLocation(
             shuffleKey,
             requestLoc.getUniqueId)
         }
       if (location == null) {
         location = requestLoc
         val writer = storageManager.createPartitionDataWriter(
           applicationId,
           shuffleId,
           location,
           splitThreshold,
           splitMode,
           partitionType,
           rangeReadFilter,
           userIdentifier,
           partitionSplitEnabled,
           isSegmentGranularityVisible)
         new WorkingPartition(location, writer)
       } else {
         location
   ```



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:
##########
@@ -32,7 +32,7 @@ import org.roaringbitmap.RoaringBitmap
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.identity.UserIdentifier
 import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.meta.{ReduceFileMeta, WorkerInfo, 
WorkerPartitionLocationInfo}
+import org.apache.celeborn.common.meta.{WorkerInfo, 
WorkerPartitionLocationInfo}

Review Comment:
   The removal of `ReduceFileMeta` from the import statement suggests it may no 
longer be used in this file. However, without seeing the full file context, 
this could indicate dead code removal or the import might still be needed 
elsewhere in the file.



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:
##########
@@ -299,6 +258,109 @@ private[deploy] class Controller(
     context.reply(ReserveSlotsResponse(StatusCode.SUCCESS))
   }
 
+  private def createWriters(
+      shuffleKey: String,
+      applicationId: String,
+      shuffleId: Int,
+      requestLocs: jList[PartitionLocation],
+      splitThreshold: Long,
+      splitMode: PartitionSplitMode,
+      partitionType: PartitionType,
+      rangeReadFilter: Boolean,
+      userIdentifier: UserIdentifier,
+      partitionSplitEnabled: Boolean,
+      isSegmentGranularityVisible: Boolean,
+      isPrimary: Boolean): jList[PartitionLocation] = {
+    val partitionLocations = new jArrayList[PartitionLocation]()
+    try {
+      def createWriter(partitionLocation: PartitionLocation): 
PartitionLocation = {
+        createPartitionDataWriter(
+          shuffleKey,
+          applicationId,
+          shuffleId,
+          partitionLocation,
+          splitThreshold,
+          splitMode,
+          partitionType,
+          rangeReadFilter,
+          userIdentifier,
+          partitionSplitEnabled,
+          isSegmentGranularityVisible,
+          isPrimary)
+      }
+      if (createWriterThreadPool == null) {
+        partitionLocations.addAll(requestLocs.asScala.map(createWriter).asJava)
+      } else {
+        partitionLocations.addAll(requestLocs.asScala.map(requestLoc =>
+          createWriterThreadPool.submit(new Callable[PartitionLocation] {
+            override def call(): PartitionLocation = createWriter(requestLoc)
+          })).map(_.get()).asJava)

Review Comment:
   The parallel implementation uses `.map(_.get())` which blocks on each Future 
sequentially, negating the benefits of parallelization. Consider using 
`Future.sequence` or collecting all futures first, then waiting for completion 
to achieve true parallelism.
   ```suggestion
           val futures = requestLocs.asScala.map(requestLoc =>
             createWriterThreadPool.submit(new Callable[PartitionLocation] {
               override def call(): PartitionLocation = createWriter(requestLoc)
             }))
           partitionLocations.addAll(futures.map(_.get()).asJava)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to