This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 586785c88 [CELEBORN-617][FLINK] MapPartitionFileWriter updates
flushing file length
586785c88 is described below
commit 586785c88d488c1e43465c75e588d1dcaafc327b
Author: zhongqiang.czq <[email protected]>
AuthorDate: Thu Jun 8 10:47:36 2023 +0800
[CELEBORN-617][FLINK] MapPartitionFileWriter updates flushing file length
…ngth
### What changes were proposed in this pull request?
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1519 from zhongqiangczq/mapfilelength.
Authored-by: zhongqiang.czq <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../org/apache/celeborn/common/meta/FileInfo.java | 15 ++++++++++++---
common/src/main/proto/TransportMessages.proto | 1 +
.../apache/celeborn/common/util/PbSerDeUtils.scala | 4 +++-
.../apache/celeborn/tests/flink/WordCountTest.scala | 19 ++++++++++++++++++-
.../service/deploy/worker/storage/FileWriter.java | 5 ++---
.../worker/storage/ReducePartitionFileWriter.java | 3 ++-
.../worker/storage/PartitionFilesSorterSuiteJ.java | 1 +
7 files changed, 39 insertions(+), 9 deletions(-)
diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
index 126e8ec09..cffd7cd10 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
@@ -47,6 +47,8 @@ public class FileInfo {
private int bufferSize;
private int numSubpartitions;
+ private volatile long bytesFlushed;
+
public FileInfo(String filePath, List<Long> chunkOffsets, UserIdentifier
userIdentifier) {
this(filePath, chunkOffsets, userIdentifier, PartitionType.REDUCE);
}
@@ -68,13 +70,15 @@ public class FileInfo {
UserIdentifier userIdentifier,
PartitionType partitionType,
int bufferSize,
- int numSubpartitions) {
+ int numSubpartitions,
+ long bytesFlushed) {
this.filePath = filePath;
this.chunkOffsets = chunkOffsets;
this.userIdentifier = userIdentifier;
this.partitionType = partitionType;
this.bufferSize = bufferSize;
this.numSubpartitions = numSubpartitions;
+ this.bytesFlushed = bytesFlushed;
}
public FileInfo(String filePath, UserIdentifier userIdentifier,
PartitionType partitionType) {
@@ -106,8 +110,13 @@ public class FileInfo {
return chunkOffsets.get(chunkOffsets.size() - 1);
}
- public synchronized long getFileLength() {
- return chunkOffsets.get(chunkOffsets.size() - 1);
+ public long getFileLength() {
+ return bytesFlushed;
+ }
+
+ public long updateBytesFlushed(int numBytes) {
+ bytesFlushed += numBytes;
+ return bytesFlushed;
}
public File getFile() {
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index 42918eefe..d3f26e4e9 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -434,6 +434,7 @@ message PbFileInfo {
int32 partitionType = 4;
int32 bufferSize = 5;
int32 numSubpartitions = 6;
+ int64 bytesFlushed = 7;
}
message PbFileInfoMap {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 0c5c2423e..3011a33cc 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -92,7 +92,8 @@ object PbSerDeUtils {
userIdentifier,
Utils.toPartitionType(pbFileInfo.getPartitionType),
pbFileInfo.getBufferSize,
- pbFileInfo.getNumSubpartitions)
+ pbFileInfo.getNumSubpartitions,
+ pbFileInfo.getBytesFlushed)
def toPbFileInfo(fileInfo: FileInfo): PbFileInfo =
PbFileInfo.newBuilder
@@ -102,6 +103,7 @@ object PbSerDeUtils {
.setPartitionType(fileInfo.getPartitionType.getValue)
.setBufferSize(fileInfo.getBufferSize)
.setNumSubpartitions(fileInfo.getNumSubpartitions)
+ .setBytesFlushed(fileInfo.getFileLength)
.build
@throws[InvalidProtocolBufferException]
diff --git
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
index e18a47cd8..463380855 100644
---
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
+++
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
@@ -17,6 +17,10 @@
package org.apache.celeborn.tests.flink
+import java.io.File
+
+import scala.collection.JavaConverters._
+
import org.apache.flink.api.common.{ExecutionMode, InputDependencyConstraint,
RuntimeExecutionMode}
import org.apache.flink.configuration.{ConfigConstants, Configuration,
ExecutionOptions, RestOptions}
import org.apache.flink.runtime.jobgraph.JobType
@@ -27,9 +31,11 @@ import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.service.deploy.MiniClusterFeature
+import org.apache.celeborn.service.deploy.worker.Worker
class WordCountTest extends AnyFunSuite with Logging with MiniClusterFeature
with BeforeAndAfterAll {
+ var workers: collection.Set[Worker] = null
override def beforeAll(): Unit = {
logInfo("test initialized , setup rss mini cluster")
@@ -37,7 +43,7 @@ class WordCountTest extends AnyFunSuite with Logging with
MiniClusterFeature
"celeborn.master.host" -> "localhost",
"celeborn.master.port" -> "9097")
val workerConf = Map("celeborn.master.endpoints" -> "localhost:9097")
- setUpMiniCluster(masterConf, workerConf)
+ workers = setUpMiniCluster(masterConf, workerConf)._2
}
override def afterAll(): Unit = {
@@ -73,5 +79,16 @@ class WordCountTest extends AnyFunSuite with Logging with
MiniClusterFeature
graph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING)
graph.setJobType(JobType.BATCH)
env.execute(graph)
+ checkFlushingFileLength()
+ }
+
+ private def checkFlushingFileLength(): Unit = {
+ workers.map(worker => {
+ worker.storageManager.workingDirWriters.values().asScala.map(writers => {
+ writers.forEach((fileName, fileWriter) => {
+ assert(new File(fileName).length() ==
fileWriter.getFileInfo.getFileLength)
+ })
+ })
+ })
}
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
index d615ee86d..1ce41c0fd 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
@@ -59,7 +59,6 @@ public abstract class FileWriter implements DeviceObserver {
private volatile boolean destroyed;
protected final AtomicInteger numPendingWrites = new AtomicInteger();
- protected long bytesFlushed;
public final Flusher flusher;
private final int flushWorkerIndex;
@@ -162,7 +161,7 @@ public abstract class FileWriter implements DeviceObserver {
}
addTask(task);
flushBuffer = null;
- bytesFlushed += numBytes;
+ fileInfo.updateBytesFlushed(numBytes);
if (!finalFlush) {
takeBuffer();
}
@@ -290,7 +289,7 @@ public abstract class FileWriter implements DeviceObserver {
deviceMonitor.unregisterFileWriter(this);
}
}
- return bytesFlushed;
+ return fileInfo.getFileLength();
}
public synchronized void destroy(IOException ioException) {
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
index 1e86b5dc2..06db73b0a 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
@@ -68,6 +68,7 @@ public final class ReducePartitionFileWriter extends
FileWriter {
}
private void maybeSetChunkOffsets(boolean forceSet) {
+ long bytesFlushed = fileInfo.getFileLength();
if (bytesFlushed >= nextBoundary || forceSet) {
fileInfo.addChunkOffset(bytesFlushed);
nextBoundary = bytesFlushed + shuffleChunkSize;
@@ -83,7 +84,7 @@ public final class ReducePartitionFileWriter extends
FileWriter {
// but its size is smaller than the nextBoundary, then the
// chunk offset will not be set after flushing. we should
// set it during FileWriter close.
- return fileInfo.getLastChunkOffset() == bytesFlushed;
+ return fileInfo.getLastChunkOffset() == fileInfo.getFileLength();
}
public synchronized long close() throws IOException {
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
index 9d83324c2..6b7b6a084 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
@@ -104,6 +104,7 @@ public class PartitionFilesSorterSuiteJ {
}
originFileLen = channel.size();
fileInfo.getChunkOffsets().add(originFileLen);
+ fileInfo.updateBytesFlushed((int) originFileLen);
System.out.println(
shuffleFile.getAbsolutePath()
+ " filelen "