This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new d5378628b [CELEBORN-617][FLINK] MapPartitionFileWriter updates 
flushing file length
d5378628b is described below

commit d5378628ba76af7c172ee120080c62625011e526
Author: zhongqiang.czq <[email protected]>
AuthorDate: Thu Jun 8 10:47:36 2023 +0800

    [CELEBORN-617][FLINK] MapPartitionFileWriter updates flushing file length
    
    …ngth
    
    ### What changes were proposed in this pull request?
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1519 from zhongqiangczq/mapfilelength.
    
    Authored-by: zhongqiang.czq <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit 586785c88d488c1e43465c75e588d1dcaafc327b)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../org/apache/celeborn/common/meta/FileInfo.java     | 15 ++++++++++++---
 common/src/main/proto/TransportMessages.proto         |  1 +
 .../apache/celeborn/common/util/PbSerDeUtils.scala    |  4 +++-
 .../apache/celeborn/tests/flink/WordCountTest.scala   | 19 ++++++++++++++++++-
 .../service/deploy/worker/storage/FileWriter.java     |  5 ++---
 .../worker/storage/ReducePartitionFileWriter.java     |  3 ++-
 .../worker/storage/PartitionFilesSorterSuiteJ.java    |  1 +
 7 files changed, 39 insertions(+), 9 deletions(-)

diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java 
b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
index 126e8ec09..cffd7cd10 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
@@ -47,6 +47,8 @@ public class FileInfo {
   private int bufferSize;
   private int numSubpartitions;
 
+  private volatile long bytesFlushed;
+
   public FileInfo(String filePath, List<Long> chunkOffsets, UserIdentifier 
userIdentifier) {
     this(filePath, chunkOffsets, userIdentifier, PartitionType.REDUCE);
   }
@@ -68,13 +70,15 @@ public class FileInfo {
       UserIdentifier userIdentifier,
       PartitionType partitionType,
       int bufferSize,
-      int numSubpartitions) {
+      int numSubpartitions,
+      long bytesFlushed) {
     this.filePath = filePath;
     this.chunkOffsets = chunkOffsets;
     this.userIdentifier = userIdentifier;
     this.partitionType = partitionType;
     this.bufferSize = bufferSize;
     this.numSubpartitions = numSubpartitions;
+    this.bytesFlushed = bytesFlushed;
   }
 
   public FileInfo(String filePath, UserIdentifier userIdentifier, 
PartitionType partitionType) {
@@ -106,8 +110,13 @@ public class FileInfo {
     return chunkOffsets.get(chunkOffsets.size() - 1);
   }
 
-  public synchronized long getFileLength() {
-    return chunkOffsets.get(chunkOffsets.size() - 1);
+  public long getFileLength() {
+    return bytesFlushed;
+  }
+
+  public long updateBytesFlushed(int numBytes) {
+    bytesFlushed += numBytes;
+    return bytesFlushed;
   }
 
   public File getFile() {
diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index 42918eefe..d3f26e4e9 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -434,6 +434,7 @@ message PbFileInfo {
   int32 partitionType = 4;
   int32 bufferSize = 5;
   int32 numSubpartitions = 6;
+  int64 bytesFlushed = 7;
 }
 
 message PbFileInfoMap {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 0c5c2423e..3011a33cc 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -92,7 +92,8 @@ object PbSerDeUtils {
       userIdentifier,
       Utils.toPartitionType(pbFileInfo.getPartitionType),
       pbFileInfo.getBufferSize,
-      pbFileInfo.getNumSubpartitions)
+      pbFileInfo.getNumSubpartitions,
+      pbFileInfo.getBytesFlushed)
 
   def toPbFileInfo(fileInfo: FileInfo): PbFileInfo =
     PbFileInfo.newBuilder
@@ -102,6 +103,7 @@ object PbSerDeUtils {
       .setPartitionType(fileInfo.getPartitionType.getValue)
       .setBufferSize(fileInfo.getBufferSize)
       .setNumSubpartitions(fileInfo.getNumSubpartitions)
+      .setBytesFlushed(fileInfo.getFileLength)
       .build
 
   @throws[InvalidProtocolBufferException]
diff --git 
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
 
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
index e18a47cd8..463380855 100644
--- 
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
+++ 
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
@@ -17,6 +17,10 @@
 
 package org.apache.celeborn.tests.flink
 
+import java.io.File
+
+import scala.collection.JavaConverters._
+
 import org.apache.flink.api.common.{ExecutionMode, InputDependencyConstraint, 
RuntimeExecutionMode}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, 
ExecutionOptions, RestOptions}
 import org.apache.flink.runtime.jobgraph.JobType
@@ -27,9 +31,11 @@ import org.scalatest.funsuite.AnyFunSuite
 
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.service.deploy.MiniClusterFeature
+import org.apache.celeborn.service.deploy.worker.Worker
 
 class WordCountTest extends AnyFunSuite with Logging with MiniClusterFeature
   with BeforeAndAfterAll {
+  var workers: collection.Set[Worker] = null
 
   override def beforeAll(): Unit = {
     logInfo("test initialized , setup rss mini cluster")
@@ -37,7 +43,7 @@ class WordCountTest extends AnyFunSuite with Logging with 
MiniClusterFeature
       "celeborn.master.host" -> "localhost",
       "celeborn.master.port" -> "9097")
     val workerConf = Map("celeborn.master.endpoints" -> "localhost:9097")
-    setUpMiniCluster(masterConf, workerConf)
+    workers = setUpMiniCluster(masterConf, workerConf)._2
   }
 
   override def afterAll(): Unit = {
@@ -73,5 +79,16 @@ class WordCountTest extends AnyFunSuite with Logging with 
MiniClusterFeature
     
graph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING)
     graph.setJobType(JobType.BATCH)
     env.execute(graph)
+    checkFlushingFileLength()
+  }
+
+  private def checkFlushingFileLength(): Unit = {
+    workers.map(worker => {
+      worker.storageManager.workingDirWriters.values().asScala.map(writers => {
+        writers.forEach((fileName, fileWriter) => {
+          assert(new File(fileName).length() == 
fileWriter.getFileInfo.getFileLength)
+        })
+      })
+    })
   }
 }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
index d615ee86d..1ce41c0fd 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
@@ -59,7 +59,6 @@ public abstract class FileWriter implements DeviceObserver {
   private volatile boolean destroyed;
 
   protected final AtomicInteger numPendingWrites = new AtomicInteger();
-  protected long bytesFlushed;
 
   public final Flusher flusher;
   private final int flushWorkerIndex;
@@ -162,7 +161,7 @@ public abstract class FileWriter implements DeviceObserver {
         }
         addTask(task);
         flushBuffer = null;
-        bytesFlushed += numBytes;
+        fileInfo.updateBytesFlushed(numBytes);
         if (!finalFlush) {
           takeBuffer();
         }
@@ -290,7 +289,7 @@ public abstract class FileWriter implements DeviceObserver {
         deviceMonitor.unregisterFileWriter(this);
       }
     }
-    return bytesFlushed;
+    return fileInfo.getFileLength();
   }
 
   public synchronized void destroy(IOException ioException) {
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
index 1e86b5dc2..06db73b0a 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
@@ -68,6 +68,7 @@ public final class ReducePartitionFileWriter extends 
FileWriter {
   }
 
   private void maybeSetChunkOffsets(boolean forceSet) {
+    long bytesFlushed = fileInfo.getFileLength();
     if (bytesFlushed >= nextBoundary || forceSet) {
       fileInfo.addChunkOffset(bytesFlushed);
       nextBoundary = bytesFlushed + shuffleChunkSize;
@@ -83,7 +84,7 @@ public final class ReducePartitionFileWriter extends 
FileWriter {
     // but its size is smaller than the nextBoundary, then the
     // chunk offset will not be set after flushing. we should
     // set it during FileWriter close.
-    return fileInfo.getLastChunkOffset() == bytesFlushed;
+    return fileInfo.getLastChunkOffset() == fileInfo.getFileLength();
   }
 
   public synchronized long close() throws IOException {
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
index 9d83324c2..6b7b6a084 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
@@ -104,6 +104,7 @@ public class PartitionFilesSorterSuiteJ {
     }
     originFileLen = channel.size();
     fileInfo.getChunkOffsets().add(originFileLen);
+    fileInfo.updateBytesFlushed((int) originFileLen);
     System.out.println(
         shuffleFile.getAbsolutePath()
             + " filelen "

Reply via email to