Copilot commented on code in PR #3387:
URL: https://github.com/apache/celeborn/pull/3387#discussion_r2261800104
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:
##########
@@ -299,6 +258,109 @@ private[deploy] class Controller(
context.reply(ReserveSlotsResponse(StatusCode.SUCCESS))
}
+ private def createWriters(
+ shuffleKey: String,
+ applicationId: String,
+ shuffleId: Int,
+ requestLocs: jList[PartitionLocation],
+ splitThreshold: Long,
+ splitMode: PartitionSplitMode,
+ partitionType: PartitionType,
+ rangeReadFilter: Boolean,
+ userIdentifier: UserIdentifier,
+ partitionSplitEnabled: Boolean,
+ isSegmentGranularityVisible: Boolean,
+ isPrimary: Boolean): jList[PartitionLocation] = {
+ val partitionLocations = new jArrayList[PartitionLocation]()
+ try {
+ def createWriter(partitionLocation: PartitionLocation):
PartitionLocation = {
+ createPartitionDataWriter(
+ shuffleKey,
+ applicationId,
+ shuffleId,
+ partitionLocation,
+ splitThreshold,
+ splitMode,
+ partitionType,
+ rangeReadFilter,
+ userIdentifier,
+ partitionSplitEnabled,
+ isSegmentGranularityVisible,
+ isPrimary)
+ }
+ if (createWriterThreadPool == null) {
+ partitionLocations.addAll(requestLocs.asScala.map(createWriter).asJava)
+ } else {
+ partitionLocations.addAll(requestLocs.asScala.map(requestLoc =>
+ createWriterThreadPool.submit(new Callable[PartitionLocation] {
+ override def call(): PartitionLocation = createWriter(requestLoc)
+ })).map(_.get()).asJava)
+ }
+ } catch {
+ case e: Exception =>
+ logError(s"Create FileWriter for $shuffleKey failed.", e)
+ }
+ partitionLocations
+ }
+
+ private def createPartitionDataWriter(
+ shuffleKey: String,
+ applicationId: String,
+ shuffleId: Int,
+ requestLoc: PartitionLocation,
+ splitThreshold: Long,
+ splitMode: PartitionSplitMode,
+ partitionType: PartitionType,
+ rangeReadFilter: Boolean,
+ userIdentifier: UserIdentifier,
+ partitionSplitEnabled: Boolean,
+ isSegmentGranularityVisible: Boolean,
+ isPrimary: Boolean): PartitionLocation = {
+ try {
+ var location =
+ if (isPrimary) {
+ partitionLocationInfo.getPrimaryLocation(
+ shuffleKey,
+ requestLoc.getUniqueId)
+ } else {
+ partitionLocationInfo.getReplicaLocation(
+ shuffleKey,
+ requestLoc.getUniqueId)
+ }
+ if (location == null) {
+ location = requestLoc
+ val writer = storageManager.createPartitionDataWriter(
+ applicationId,
+ shuffleId,
+ location,
+ splitThreshold,
+ splitMode,
+ partitionType,
+ rangeReadFilter,
+ userIdentifier,
+ partitionSplitEnabled,
+ isSegmentGranularityVisible)
+ new WorkingPartition(location, writer)
+ } else {
+ location
+ }
+ } catch {
+ case e: Exception =>
+ logError(s"Create FileWriter for $requestLoc $shuffleKey failed.", e)
+ throw e
Review Comment:
[nitpick] Re-throwing the exception after logging it provides no additional
value since the exception will be propagated anyway. Consider removing the
try-catch block or handling the exception more meaningfully.
```suggestion
var location =
if (isPrimary) {
partitionLocationInfo.getPrimaryLocation(
shuffleKey,
requestLoc.getUniqueId)
} else {
partitionLocationInfo.getReplicaLocation(
shuffleKey,
requestLoc.getUniqueId)
}
if (location == null) {
location = requestLoc
val writer = storageManager.createPartitionDataWriter(
applicationId,
shuffleId,
location,
splitThreshold,
splitMode,
partitionType,
rangeReadFilter,
userIdentifier,
partitionSplitEnabled,
isSegmentGranularityVisible)
new WorkingPartition(location, writer)
} else {
location
```
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:
##########
@@ -32,7 +32,7 @@ import org.roaringbitmap.RoaringBitmap
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.meta.{ReduceFileMeta, WorkerInfo,
WorkerPartitionLocationInfo}
+import org.apache.celeborn.common.meta.{WorkerInfo,
WorkerPartitionLocationInfo}
Review Comment:
The removal of `ReduceFileMeta` from the import statement suggests it may no
longer be used in this file. However, without seeing the full file context,
this could indicate dead code removal or the import might still be needed
elsewhere in the file.
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:
##########
@@ -299,6 +258,109 @@ private[deploy] class Controller(
context.reply(ReserveSlotsResponse(StatusCode.SUCCESS))
}
+ private def createWriters(
+ shuffleKey: String,
+ applicationId: String,
+ shuffleId: Int,
+ requestLocs: jList[PartitionLocation],
+ splitThreshold: Long,
+ splitMode: PartitionSplitMode,
+ partitionType: PartitionType,
+ rangeReadFilter: Boolean,
+ userIdentifier: UserIdentifier,
+ partitionSplitEnabled: Boolean,
+ isSegmentGranularityVisible: Boolean,
+ isPrimary: Boolean): jList[PartitionLocation] = {
+ val partitionLocations = new jArrayList[PartitionLocation]()
+ try {
+ def createWriter(partitionLocation: PartitionLocation):
PartitionLocation = {
+ createPartitionDataWriter(
+ shuffleKey,
+ applicationId,
+ shuffleId,
+ partitionLocation,
+ splitThreshold,
+ splitMode,
+ partitionType,
+ rangeReadFilter,
+ userIdentifier,
+ partitionSplitEnabled,
+ isSegmentGranularityVisible,
+ isPrimary)
+ }
+ if (createWriterThreadPool == null) {
+ partitionLocations.addAll(requestLocs.asScala.map(createWriter).asJava)
+ } else {
+ partitionLocations.addAll(requestLocs.asScala.map(requestLoc =>
+ createWriterThreadPool.submit(new Callable[PartitionLocation] {
+ override def call(): PartitionLocation = createWriter(requestLoc)
+ })).map(_.get()).asJava)
Review Comment:
The parallel implementation uses `.map(_.get())` which blocks on each Future
sequentially, negating the benefits of parallelization. Consider using
`Future.sequence` or collecting all futures first, then waiting for completion
to achieve true parallelism.
```suggestion
val futures = requestLocs.asScala.map(requestLoc =>
createWriterThreadPool.submit(new Callable[PartitionLocation] {
override def call(): PartitionLocation = createWriter(requestLoc)
}))
partitionLocations.addAll(futures.map(_.get()).asJava)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]