TheodoreLx commented on code in PR #3394:
URL: https://github.com/apache/celeborn/pull/3394#discussion_r2241671317


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala:
##########
@@ -33,7 +33,19 @@ abstract private[worker] class FlushTask(
     val notifier: FlushNotifier,
     val keepBuffer: Boolean,
     val source: AbstractSource) {
-  def flush(): Unit
+  def flush(copyBytes: Array[Byte]): Unit
+
+  def convertBufferToBytes(
+      buffer: CompositeByteBuf,
+      copyBytes: Array[Byte],
+      length: Int): Array[Byte] = {
+    if (copyBytes != null) {
+      buffer.readBytes(copyBytes, 0, length)

Review Comment:
   Taking HdfsFlushTask as an example, the capacity of copyBytes is equal to 
`celeborn.worker.flusher.hdfs.buffer.size`. Generally, the length of the buffer 
in HdfsFlushTask is less than `celeborn.worker.flusher.hdfs.buffer.size`, so 
the copy will not be truncated or out of bounds.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to