This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new ae4022235 [CELEBORN-2047] Support MapPartitionData on DFS
ae4022235 is described below
commit ae40222351cbeb1a9bdd398d461255a0739f3cac
Author: SteNicholas <[email protected]>
AuthorDate: Sat Jul 26 22:11:32 2025 +0800
[CELEBORN-2047] Support MapPartitionData on DFS
### What changes were proposed in this pull request?
Support `MapPartitionData` on DFS.
### Why are the changes needed?
`MapPartitionData` only supports on local, which does not support on DFS.
It's recommended to support `MapPartitionData` on DFS for MapPartition mode to
align the ability of ReducePartition mode.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`WordCountTestWithHDFS`.
Closes #3349 from SteNicholas/CELEBORN-2047.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../apache/celeborn/common/meta/DiskFileInfo.java | 4 +
.../org/apache/celeborn/common/meta/FileInfo.java | 4 +
.../org/apache/celeborn/common/util/Utils.scala | 8 +-
project/CelebornBuild.scala | 8 +-
tests/flink-it/pom.xml | 31 ++++++
.../celeborn/tests/flink/WordCountTest.scala | 45 ++++++++-
.../worker/storage/DfsPartitionDataReader.java | 108 +++++++++++++++++++++
.../worker/storage/LocalPartitionDataReader.java | 101 +++++++++++++++++++
.../deploy/worker/storage/MapPartitionData.java | 15 +--
.../worker/storage/MapPartitionDataReader.java | 92 +++++-------------
.../deploy/worker/storage/PartitionDataReader.java | 85 ++++++++++++++++
.../worker/storage/PartitionMetaHandler.scala | 29 +-----
.../deploy/worker/storage/StorageManager.scala | 4 +-
.../service/deploy/worker/storage/TierWriter.scala | 52 +++++-----
.../service/deploy/MiniClusterFeature.scala | 6 +-
15 files changed, 452 insertions(+), 140 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
index c7161483b..ab798eca3 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
@@ -164,4 +164,8 @@ public class DiskFileInfo extends FileInfo {
public boolean isDFS() {
return Utils.isS3Path(filePath) || Utils.isOssPath(filePath) ||
Utils.isHdfsPath(filePath);
}
+
+ public StorageInfo.Type getStorageType() {
+ return storageType;
+ }
}
diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
index e81b72936..e8511f1bf 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
@@ -108,4 +108,8 @@ public abstract class FileInfo {
}
public abstract String getFilePath();
+
+ public boolean isReduceFileMeta() {
+ return isReduceFileMeta;
+ }
}
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 3fd090e77..aa719cff1 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -40,6 +40,7 @@ import com.google.protobuf.{ByteString, GeneratedMessageV3}
import io.netty.channel.unix.Errors.NativeIoException
import org.apache.commons.lang3.SystemUtils
import org.apache.commons.lang3.time.FastDateFormat
+import org.apache.hadoop.fs.FSDataInputStream
import org.roaringbitmap.RoaringBitmap
import org.apache.celeborn.common.CelebornConf
@@ -1175,12 +1176,11 @@ object Utils extends Logging {
}
@throws[IOException]
- def checkFileIntegrity(fileChannel: FileChannel, length: Int): Unit = {
- val remainingBytes = fileChannel.size - fileChannel.position
+ def checkFileIntegrity(remainingBytes: Long, length: Int, filePath: String):
Unit = {
if (remainingBytes < length) {
logError(
- s"File remaining bytes not not enough, remaining: ${remainingBytes},
wanted: ${length}.")
- throw new RuntimeException(s"File is corrupted ${fileChannel}")
+ s"File remaining bytes not not enough, remaining: $remainingBytes,
wanted: $length.")
+ throw new RuntimeException(s"File is corrupted $filePath")
}
}
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index cdce5973b..760aaf8fe 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -268,6 +268,8 @@ object Dependencies {
ExclusionRule("org.apache.httpcomponents", "httpclient"),
ExclusionRule("org.slf4j", "slf4j-log4j12")
)
+ val hadoopAuth = "org.apache.hadoop" % "hadoop-auth" % hadoopVersion
+ val hadoopHdfs = "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion
val picocli = "info.picocli" % "picocli" % picocliVersion
@@ -1284,7 +1286,11 @@ trait FlinkClientProjects {
"org.apache.flink" % "flink-runtime" % flinkVersion % "test",
flinkStreamingDependency,
flinkClientsDependency,
- flinkRuntimeWebDependency
+ flinkRuntimeWebDependency,
+ Dependencies.hadoopCommon % "test",
+ Dependencies.hadoopAuth % "test",
+ Dependencies.hadoopHdfs % "test->test;compile->compile",
+ Dependencies.jerseyServer % "test",
) ++ commonUnitTestDependencies,
(Test / envVars) += ("FLINK_VERSION", flinkVersion)
)
diff --git a/tests/flink-it/pom.xml b/tests/flink-it/pom.xml
index 8664fbe94..b4ef70ea5 100644
--- a/tests/flink-it/pom.xml
+++ b/tests/flink-it/pom.xml
@@ -91,5 +91,36 @@
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
index cc75a73f4..fa8860efc 100644
---
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
+++
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
@@ -18,6 +18,7 @@
package org.apache.celeborn.tests.flink
import java.io.File
+import java.nio.file.Files
import scala.collection.JavaConverters._
@@ -26,13 +27,17 @@ import org.apache.flink.configuration.{Configuration,
ExecutionOptions}
import org.apache.flink.runtime.jobgraph.JobType
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode
+import org.apache.flink.util.OperatingSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hdfs.MiniDFSCluster
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.CelebornConf.{AUTH_ENABLED,
INTERNAL_PORT_ENABLED}
+import org.apache.celeborn.common.CelebornConf.{ACTIVE_STORAGE_TYPES,
AUTH_ENABLED, HDFS_DIR, INTERNAL_PORT_ENABLED,
WORKER_STORAGE_CREATE_FILE_POLICY}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.protocol.FallbackPolicy
+import org.apache.celeborn.rest.v1.model.PartitionLocationData.StorageEnum
import org.apache.celeborn.service.deploy.MiniClusterFeature
import org.apache.celeborn.service.deploy.worker.Worker
@@ -132,3 +137,41 @@ class WordCountTestWithAuthentication extends
WordCountTestBase {
override protected def getWorkerConf: Map[String, String] = authConfig
override protected def getClientConf: Map[String, String] =
Map(AUTH_ENABLED.key -> "true")
}
+
+class WordCountTestWithHDFS extends WordCountTestBase {
+
+ private var basePath: Path = _
+ private var hdfsCluster: MiniDFSCluster = _
+
+ override protected def getMasterConf: Map[String, String] = Map()
+ override protected def getWorkerConf: Map[String, String] = hdfsConfig
+ override protected def getClientConf: Map[String, String] = hdfsConfig
+
+ override def createWorker(map: Map[String, String]): Worker = {
+ super.createWorker(map, null)
+ }
+
+ override def beforeAll(): Unit = {
+ assume(!OperatingSystem.isWindows)
+ val hdConf = new org.apache.hadoop.conf.Configuration()
+ val tmpDir = Files.createTempDirectory("celeborn-")
+ tmpDir.toFile.deleteOnExit()
+ hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpDir.toString)
+ hdfsCluster = new MiniDFSCluster.Builder(hdConf).build
+ basePath = new Path(hdfsCluster.getFileSystem.getUri.toString + "/test")
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ if (hdfsCluster != null) {
+ hdfsCluster.getFileSystem.delete(basePath, true)
+ hdfsCluster.shutdown()
+ }
+ }
+
+ private def hdfsConfig = Map(
+ ACTIVE_STORAGE_TYPES.key -> StorageEnum.HDFS.getValue,
+ WORKER_STORAGE_CREATE_FILE_POLICY.key -> StorageEnum.HDFS.getValue,
+ HDFS_DIR.key -> basePath.toString)
+}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/DfsPartitionDataReader.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/DfsPartitionDataReader.java
new file mode 100644
index 000000000..c461e8366
--- /dev/null
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/DfsPartitionDataReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.celeborn.common.meta.DiskFileInfo;
+import org.apache.celeborn.common.protocol.StorageInfo;
+import org.apache.celeborn.common.util.Utils;
+
+public class DfsPartitionDataReader extends PartitionDataReader {
+
+ private final FSDataInputStream dataInputStream;
+ private final FSDataInputStream indexInputStream;
+
+ public DfsPartitionDataReader(
+ DiskFileInfo fileInfo, ByteBuffer headerBuffer, ByteBuffer indexBuffer)
throws IOException {
+ super(fileInfo, headerBuffer, indexBuffer);
+ FileSystem fileSystem =
+ StorageManager.hadoopFs()
+ .get(
+ fileInfo.isHdfs()
+ ? StorageInfo.Type.HDFS
+ : fileInfo.isS3() ? StorageInfo.Type.S3 :
StorageInfo.Type.OSS);
+ this.dataInputStream = fileSystem.open(fileInfo.getDfsPath());
+ this.indexInputStream = fileSystem.open(fileInfo.getDfsIndexPath());
+ this.dataFileSize =
fileSystem.getFileStatus(fileInfo.getDfsPath()).getLen();
+ this.indexFileSize =
fileSystem.getFileStatus(fileInfo.getDfsIndexPath()).getLen();
+ }
+
+ @Override
+ public void readIndexBuffer(long targetPosition) throws IOException {
+ indexInputStream.seek(targetPosition);
+ readHeaderOrIndexBuffer(
+ indexInputStream,
+ indexBuffer,
+ indexFileSize,
+ indexBuffer.capacity(),
+ fileInfo.getIndexPath());
+ }
+
+ @Override
+ public void position(long targetPosition) throws IOException {
+ dataInputStream.seek(targetPosition);
+ }
+
+ @Override
+ public void readHeaderBuffer(int headerSize) throws IOException {
+ readHeaderOrIndexBuffer(
+ dataInputStream, headerBuffer, dataFileSize, headerSize,
fileInfo.getFilePath());
+ }
+
+ @Override
+ public void readBufferIntoReadBuffer(ByteBuf buf, long fileSize, int length,
String filePath)
+ throws IOException {
+ Utils.checkFileIntegrity(fileSize - dataInputStream.getPos(), length,
filePath);
+ ByteBuffer tmpBuffer = ByteBuffer.allocate(length);
+ while (tmpBuffer.hasRemaining()) {
+ dataInputStream.read(tmpBuffer);
+ }
+ tmpBuffer.flip();
+ buf.writeBytes(tmpBuffer);
+ }
+
+ @Override
+ public long position() throws IOException {
+ return dataInputStream.getPos();
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(dataInputStream);
+ IOUtils.closeQuietly(indexInputStream);
+ }
+
+ private void readHeaderOrIndexBuffer(
+ FSDataInputStream inputStream, ByteBuffer buffer, long fileSize, int
length, String filePath)
+ throws IOException {
+ Utils.checkFileIntegrity(fileSize - inputStream.getPos(), length,
filePath);
+ buffer.clear();
+ buffer.limit(length);
+ while (buffer.hasRemaining()) {
+ inputStream.read(buffer);
+ }
+ buffer.flip();
+ }
+}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/LocalPartitionDataReader.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/LocalPartitionDataReader.java
new file mode 100644
index 000000000..c5c47aedf
--- /dev/null
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/LocalPartitionDataReader.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.io.IOUtils;
+
+import org.apache.celeborn.common.meta.DiskFileInfo;
+import org.apache.celeborn.common.util.FileChannelUtils;
+import org.apache.celeborn.common.util.Utils;
+
+public class LocalPartitionDataReader extends PartitionDataReader {
+
+ private final FileChannel dataFileChanel;
+ private final FileChannel indexFileChannel;
+
+ public LocalPartitionDataReader(
+ DiskFileInfo fileInfo, ByteBuffer headerBuffer, ByteBuffer indexBuffer)
throws IOException {
+ super(fileInfo, headerBuffer, indexBuffer);
+ this.dataFileChanel =
FileChannelUtils.openReadableFileChannel(fileInfo.getFilePath());
+ this.indexFileChannel =
FileChannelUtils.openReadableFileChannel(fileInfo.getIndexPath());
+ this.dataFileSize = dataFileChanel.size();
+ this.indexFileSize = indexFileChannel.size();
+ }
+
+ @Override
+ public void readIndexBuffer(long targetPosition) throws IOException {
+ indexFileChannel.position(targetPosition);
+ readHeaderOrIndexBuffer(
+ indexFileChannel,
+ indexBuffer,
+ indexFileSize,
+ indexBuffer.capacity(),
+ fileInfo.getIndexPath());
+ }
+
+ @Override
+ public void position(long targetPosition) throws IOException {
+ dataFileChanel.position(targetPosition);
+ }
+
+ @Override
+ public void readHeaderBuffer(int headerSize) throws IOException {
+ readHeaderOrIndexBuffer(
+ dataFileChanel, headerBuffer, dataFileSize, headerSize,
fileInfo.getFilePath());
+ }
+
+ @Override
+ public void readBufferIntoReadBuffer(ByteBuf buf, long fileSize, int length,
String filePath)
+ throws IOException {
+ Utils.checkFileIntegrity(fileSize - dataFileChanel.position(), length,
filePath);
+ ByteBuffer tmpBuffer = ByteBuffer.allocate(length);
+ while (tmpBuffer.hasRemaining()) {
+ dataFileChanel.read(tmpBuffer);
+ }
+ tmpBuffer.flip();
+ buf.writeBytes(tmpBuffer);
+ }
+
+ @Override
+ public long position() throws IOException {
+ return dataFileChanel.position();
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(dataFileChanel);
+ IOUtils.closeQuietly(indexFileChannel);
+ }
+
+ private void readHeaderOrIndexBuffer(
+ FileChannel channel, ByteBuffer buffer, long fileSize, int length,
String filePath)
+ throws IOException {
+ Utils.checkFileIntegrity(fileSize - channel.position(), length, filePath);
+ buffer.clear();
+ buffer.limit(length);
+ while (buffer.hasRemaining()) {
+ channel.read(buffer);
+ }
+ buffer.flip();
+ }
+}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
index d88370edf..66d3be67b 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
@@ -18,7 +18,6 @@
package org.apache.celeborn.service.deploy.worker.storage;
import java.io.IOException;
-import java.nio.channels.FileChannel;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -28,13 +27,11 @@ import java.util.function.Consumer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
-import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.meta.DiskFileInfo;
import org.apache.celeborn.common.meta.MapFileMeta;
-import org.apache.celeborn.common.util.FileChannelUtils;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.service.deploy.worker.memory.BufferQueue;
@@ -49,9 +46,6 @@ public class MapPartitionData implements
MemoryManager.ReadBufferTargetChangeLis
protected final ExecutorService readExecutor;
protected final ConcurrentHashMap<Long, MapPartitionDataReader> readers =
JavaUtils.newConcurrentHashMap();
- private FileChannel dataFileChanel;
- private FileChannel indexChannel;
- private long indexSize;
private volatile boolean isReleased = false;
private final BufferQueue bufferQueue = new BufferQueue();
private AtomicBoolean bufferQueueInitialized = new AtomicBoolean(false);
@@ -95,9 +89,6 @@ public class MapPartitionData implements
MemoryManager.ReadBufferTargetChangeLis
threadsPerMountPoint,
String.format("worker-map-partition-%s-reader",
mapFileMeta.getMountPoint()),
false));
- this.dataFileChanel =
FileChannelUtils.openReadableFileChannel(diskFileInfo.getFilePath());
- this.indexChannel =
FileChannelUtils.openReadableFileChannel(diskFileInfo.getIndexPath());
- this.indexSize = indexChannel.size();
MemoryManager.instance().addReadBufferTargetChangeListener(this);
}
@@ -174,7 +165,7 @@ public class MapPartitionData implements
MemoryManager.ReadBufferTargetChangeLis
}
protected void openReader(MapPartitionDataReader reader) throws IOException {
- reader.open(dataFileChanel, indexChannel, indexSize);
+ reader.open();
}
public synchronized void readBuffers() {
@@ -252,8 +243,8 @@ public class MapPartitionData implements
MemoryManager.ReadBufferTargetChangeLis
bufferQueue.release();
isReleased = true;
- IOUtils.closeQuietly(dataFileChanel);
- IOUtils.closeQuietly(indexChannel);
+ readers.values().forEach(MapPartitionDataReader::close);
+ readers.clear();
MemoryManager.instance().removeReadBufferTargetChangeListener(this);
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
index 8f8a2c353..a0a5ce506 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
@@ -20,7 +20,6 @@ package org.apache.celeborn.service.deploy.worker.storage;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
-import java.nio.channels.FileChannel;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
@@ -95,13 +94,6 @@ public class MapPartitionDataReader implements
Comparable<MapPartitionDataReader
@GuardedBy("lock")
protected boolean errorNotified;
- private FileChannel dataFileChannel;
-
- // The size of the data file, it is initialized in the open method and
remains unchanged
- // afterward.
- private long dataFileChannelSize;
- private FileChannel indexFileChannel;
-
private Channel associatedChannel;
private Runnable recycleStream;
@@ -109,6 +101,8 @@ public class MapPartitionDataReader implements
Comparable<MapPartitionDataReader
protected AtomicInteger numInUseBuffers = new AtomicInteger(0);
private boolean isOpen = false;
+ private PartitionDataReader partitionDataReader;
+
public MapPartitionDataReader(
int startPartitionIndex,
int endPartitionIndex,
@@ -132,15 +126,16 @@ public class MapPartitionDataReader implements
Comparable<MapPartitionDataReader
this.readFinished = false;
}
- public void open(FileChannel dataFileChannel, FileChannel indexFileChannel,
long indexSize)
- throws IOException {
+ public void open() throws IOException {
if (!isOpen) {
- this.dataFileChannel = dataFileChannel;
- this.dataFileChannelSize = dataFileChannel.size();
- this.indexFileChannel = indexFileChannel;
+ this.partitionDataReader =
+ fileInfo.isDFS()
+ ? new DfsPartitionDataReader(fileInfo, headerBuffer, indexBuffer)
+ : new LocalPartitionDataReader(fileInfo, headerBuffer,
indexBuffer);
// index is (offset,length)
long indexRegionSize = mapFileMeta.getNumSubpartitions() * (long)
INDEX_ENTRY_SIZE;
- this.numRegions = Utils.checkedDownCast(indexSize / indexRegionSize);
+ this.numRegions =
+ Utils.checkedDownCast(partitionDataReader.getIndexFileSize() /
indexRegionSize);
updateConsumingOffset();
isOpen = true;
@@ -285,44 +280,6 @@ public class MapPartitionDataReader implements
Comparable<MapPartitionDataReader
return mapFileMeta.getNumSubpartitions() * (long) INDEX_ENTRY_SIZE;
}
- protected void readHeaderOrIndexBuffer(FileChannel channel, ByteBuffer
buffer, int length)
- throws IOException {
- Utils.checkFileIntegrity(channel, length);
- buffer.clear();
- buffer.limit(length);
- while (buffer.hasRemaining()) {
- channel.read(buffer);
- }
- buffer.flip();
- }
-
- protected void readBufferIntoReadBuffer(FileChannel channel, ByteBuf buf,
int length)
- throws IOException {
- Utils.checkFileIntegrity(channel, length);
- ByteBuffer tmpBuffer = ByteBuffer.allocate(length);
- while (tmpBuffer.hasRemaining()) {
- channel.read(tmpBuffer);
- }
- tmpBuffer.flip();
- buf.writeBytes(tmpBuffer);
- }
-
- protected int readBuffer(
- String filename, FileChannel channel, ByteBuffer header, ByteBuf buffer,
int headerSize)
- throws IOException {
- readHeaderOrIndexBuffer(channel, header, headerSize);
- // header is combined of mapId(4),attemptId(4),nextBatchId(4) and total
Compressed Length(4)
- // we need size here,so we read length directly
- int bufferLength = header.getInt(12);
- if (bufferLength <= 0 || bufferLength > buffer.capacity()) {
- logger.error("Incorrect buffer header, buffer length: {}.",
bufferLength);
- throw new FileCorruptedException("File " + filename + " is corrupted");
- }
- buffer.writeBytes(header);
- readBufferIntoReadBuffer(channel, buffer, bufferLength);
- return bufferLength + headerSize;
- }
-
protected void updateConsumingOffset() throws IOException {
while (currentPartitionRemainingBytes == 0
&& (currentDataRegion < numRegions - 1 || numRemainingPartitions > 0))
{
@@ -331,10 +288,10 @@ public class MapPartitionDataReader implements
Comparable<MapPartitionDataReader
numRemainingPartitions = endPartitionIndex - startPartitionIndex + 1;
// read the target index entry to the target index buffer
- indexFileChannel.position(
+ long targetPosition =
currentDataRegion * getIndexRegionSize()
- + (long) startPartitionIndex * INDEX_ENTRY_SIZE);
- readHeaderOrIndexBuffer(indexFileChannel, indexBuffer,
indexBuffer.capacity());
+ + (long) startPartitionIndex * INDEX_ENTRY_SIZE;
+ partitionDataReader.readIndexBuffer(targetPosition);
}
// get the data file offset and the data size
@@ -345,13 +302,14 @@ public class MapPartitionDataReader implements
Comparable<MapPartitionDataReader
logger.debug(
"readBuffer updateConsumingOffset, {}, {}, {}, {}",
streamId,
- dataFileChannelSize,
+ partitionDataReader.getDataFileSize(),
dataConsumingOffset,
currentPartitionRemainingBytes);
// if these checks fail, the partition file must be corrupted
if (dataConsumingOffset < 0
- || dataConsumingOffset + currentPartitionRemainingBytes >
dataFileChannelSize
+ || dataConsumingOffset + currentPartitionRemainingBytes
+ > partitionDataReader.getDataFileSize()
|| currentPartitionRemainingBytes < 0) {
throw new FileCorruptedException("File " + fileInfo.getFilePath() + "
is corrupted");
}
@@ -360,17 +318,9 @@ public class MapPartitionDataReader implements
Comparable<MapPartitionDataReader
private synchronized boolean readBuffer(ByteBuf buffer) throws IOException {
try {
- dataFileChannel.position(dataConsumingOffset);
-
- int readSize =
- readBuffer(
- fileInfo.getFilePath(),
- dataFileChannel,
- headerBuffer,
- buffer,
- headerBuffer.capacity());
+ int readSize = partitionDataReader.readBuffer(buffer,
dataConsumingOffset);
currentPartitionRemainingBytes -= readSize;
- dataConsumingOffset = dataFileChannel.position();
+ dataConsumingOffset = partitionDataReader.position();
logger.debug(
"readBuffer data: {}, {}, {}, {}, {}, {}",
@@ -388,7 +338,7 @@ public class MapPartitionDataReader implements
Comparable<MapPartitionDataReader
logger.debug(
"readBuffer end, {}, {}, {}, {}",
streamId,
- dataFileChannelSize,
+ partitionDataReader.getDataFileSize(),
dataConsumingOffset,
currentPartitionRemainingBytes);
int prevDataRegion = currentDataRegion;
@@ -399,7 +349,7 @@ public class MapPartitionDataReader implements
Comparable<MapPartitionDataReader
logger.debug(
"readBuffer run: {}, {}, {}, {}",
streamId,
- dataFileChannelSize,
+ partitionDataReader.getDataFileSize(),
dataConsumingOffset,
currentPartitionRemainingBytes);
return true;
@@ -557,4 +507,8 @@ public class MapPartitionDataReader implements
Comparable<MapPartitionDataReader
return !isReleased && !readFinished;
}
}
+
+ public void close() {
+ partitionDataReader.close();
+ }
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataReader.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataReader.java
new file mode 100644
index 000000000..48b08d741
--- /dev/null
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataReader.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.exception.FileCorruptedException;
+import org.apache.celeborn.common.meta.DiskFileInfo;
+
+public abstract class PartitionDataReader {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PartitionDataReader.class);
+
+ protected final DiskFileInfo fileInfo;
+ protected final ByteBuffer headerBuffer;
+ protected final ByteBuffer indexBuffer;
+
+ protected long dataFileSize;
+ protected long indexFileSize;
+
+ public PartitionDataReader(
+ DiskFileInfo fileInfo, ByteBuffer headerBuffer, ByteBuffer indexBuffer) {
+ this.fileInfo = fileInfo;
+ this.headerBuffer = headerBuffer;
+ this.indexBuffer = indexBuffer;
+ }
+
+ public abstract void readIndexBuffer(long targetPosition) throws IOException;
+
+ public abstract void position(long targetPosition) throws IOException;
+
+ public abstract void readHeaderBuffer(int headSize) throws IOException;
+
+ public abstract void readBufferIntoReadBuffer(
+ ByteBuf buf, long fileSize, int length, String filePath) throws
IOException;
+
+ public abstract long position() throws IOException;
+
+ public abstract void close();
+
+ public int readBuffer(ByteBuf buffer, long dataConsumingOffset) throws
IOException {
+ position(dataConsumingOffset);
+ int headerSize = headerBuffer.capacity();
+ readHeaderBuffer(headerSize);
+ // header is combined of mapId(4),attemptId(4),nextBatchId(4) and total
Compressed Length(4)
+ // we need size here,so we read length directly
+ int bufferLength = headerBuffer.getInt(12);
+ if (bufferLength <= 0 || bufferLength > buffer.capacity()) {
+ LOG.error("Incorrect buffer header, buffer length: {}.", bufferLength);
+ throw new FileCorruptedException(
+ String.format("File %s is corrupted", fileInfo.getFilePath()));
+ }
+ buffer.writeBytes(headerBuffer);
+ readBufferIntoReadBuffer(buffer, dataFileSize, bufferLength,
fileInfo.getFilePath());
+ return bufferLength + headerSize;
+ }
+
+ public long getDataFileSize() {
+ return dataFileSize;
+ }
+
+ public long getIndexFileSize() {
+ return indexFileSize;
+ }
+}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala
index 0eeaa5e70..902d720ee 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala
@@ -29,8 +29,8 @@ import org.apache.hadoop.fs.FileSystem
import org.roaringbitmap.RoaringBitmap
import org.slf4j.{Logger, LoggerFactory}
-import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MapFileMeta,
MemoryFileInfo, ReduceFileMeta}
-import org.apache.celeborn.common.protocol.{PbPushDataHandShake,
PbRegionFinish, PbRegionStart, PbSegmentStart}
+import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MapFileMeta,
ReduceFileMeta}
+import org.apache.celeborn.common.protocol.{PbPushDataHandShake,
PbRegionFinish, PbRegionStart, PbSegmentStart, StorageInfo}
import org.apache.celeborn.common.unsafe.Platform
import org.apache.celeborn.common.util.FileChannelUtils
@@ -94,7 +94,7 @@ trait PartitionMetaHandler {
class MapPartitionMetaHandler(
diskFileInfo: DiskFileInfo,
notifier: FlushNotifier) extends PartitionMetaHandler {
- lazy val hadoopFs: FileSystem = StorageManager.hadoopFs.get()
+ lazy val hadoopFs: FileSystem =
StorageManager.hadoopFs.get(diskFileInfo.getStorageType)
val logger: Logger =
LoggerFactory.getLogger(classOf[MapPartitionMetaHandler])
val fileMeta: MapFileMeta =
diskFileInfo.getFileMeta.asInstanceOf[MapFileMeta]
var numSubpartitions = 0
@@ -286,28 +286,7 @@ class MapPartitionMetaHandler(
}
override def afterClose(): Unit = {
- // TODO: force flush the index file channel in scenarios which the
upstream task writes and
- // downstream task reads simultaneously, such as flink hybrid shuffle
- if (indexBuffer != null) {
- logger.debug(s"flushIndex start:${diskFileInfo.getIndexPath}")
- val startTime = System.currentTimeMillis
- indexBuffer.flip
- notifier.checkException()
- try {
- if (indexBuffer.hasRemaining) {
- // map partition synchronously writes file index
- if (indexChannel != null) while (indexBuffer.hasRemaining)
indexChannel.write(indexBuffer)
- else if (diskFileInfo.isDFS) {
- val dfsStream = hadoopFs.append(diskFileInfo.getDfsIndexPath)
- dfsStream.write(indexBuffer.array)
- dfsStream.close()
- }
- }
- indexBuffer.clear
- } finally logger.debug(
- s"flushIndex end:${diskFileInfo.getIndexPath}, " +
- s"cost:${System.currentTimeMillis - startTime}")
- }
+ flushIndex()
}
override def beforeWrite(bytes: ByteBuf): Unit = {
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 3070e4b94..b71110947 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -43,7 +43,7 @@ import
org.apache.celeborn.common.metrics.source.{AbstractSource, ThreadPoolSour
import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
import org.apache.celeborn.common.protocol.{PartitionLocation,
PartitionSplitMode, PartitionType, StorageInfo}
import org.apache.celeborn.common.quota.ResourceConsumption
-import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils,
DiskUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
+import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils,
CollectionUtils, DiskUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
import org.apache.celeborn.service.deploy.worker._
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
import
org.apache.celeborn.service.deploy.worker.memory.MemoryManager.MemoryPressureListener
@@ -819,7 +819,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
}
}
}
- if (null != diskOperators) {
+ if (CollectionUtils.isNotEmpty(diskOperators)) {
if (exitKind != CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
cleanupExpiredShuffleKey(shuffleKeySet(), false)
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index 7c3f1b966..fa0934efb 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -494,7 +494,7 @@ class DfsTierWriter(
notifier: FlushNotifier,
flusher: Flusher,
source: AbstractSource,
- hdfsFileInfo: DiskFileInfo,
+ dfsFileInfo: DiskFileInfo,
storageType: StorageInfo.Type,
partitionDataWriterContext: PartitionDataWriterContext,
storageManager: StorageManager)
@@ -503,7 +503,7 @@ class DfsTierWriter(
metaHandler,
numPendingWrites,
notifier,
- hdfsFileInfo,
+ dfsFileInfo,
source,
storageType,
partitionDataWriterContext.getPartitionLocation.getFileName,
@@ -520,21 +520,21 @@ class DfsTierWriter(
var partNumber: Int = 1
this.flusherBufferSize =
- if (hdfsFileInfo.isS3()) {
+ if (dfsFileInfo.isS3()) {
conf.workerS3FlusherBufferSize
- } else if (hdfsFileInfo.isOSS()) {
+ } else if (dfsFileInfo.isOSS()) {
conf.workerOssFlusherBufferSize
} else {
conf.workerHdfsFlusherBufferSize
}
try {
- hadoopFs.create(hdfsFileInfo.getDfsPath, true).close()
- if (hdfsFileInfo.isS3) {
+ hadoopFs.create(dfsFileInfo.getDfsPath, true).close()
+ if (dfsFileInfo.isS3) {
val uri = hadoopFs.getUri
val bucketName = uri.getHost
- val index = hdfsFileInfo.getFilePath.indexOf(bucketName)
- val key = hdfsFileInfo.getFilePath.substring(index + bucketName.length +
1)
+ val index = dfsFileInfo.getFilePath.indexOf(bucketName)
+ val key = dfsFileInfo.getFilePath.substring(index + bucketName.length +
1)
this.s3MultipartUploadHandler =
TierWriterHelper.getS3MultipartUploadHandler(
hadoopFs,
@@ -544,7 +544,7 @@ class DfsTierWriter(
conf.s3MultiplePartUploadBaseDelay,
conf.s3MultiplePartUploadMaxBackoff)
s3MultipartUploadHandler.startUpload()
- } else if (hdfsFileInfo.isOSS) {
+ } else if (dfsFileInfo.isOSS) {
val configuration = hadoopFs.getConf
val ossEndpoint = configuration.get("fs.oss.endpoint")
val ossAccessKey = configuration.get("fs.oss.accessKeyId")
@@ -552,8 +552,8 @@ class DfsTierWriter(
val uri = hadoopFs.getUri
val bucketName = uri.getHost
- val index = hdfsFileInfo.getFilePath.indexOf(bucketName)
- val key = hdfsFileInfo.getFilePath.substring(index + bucketName.length +
1)
+ val index = dfsFileInfo.getFilePath.indexOf(bucketName)
+ val key = dfsFileInfo.getFilePath.substring(index + bucketName.length +
1)
this.ossMultipartUploadHandler =
TierWriterHelper.getOssMultipartUploadHandler(
ossEndpoint,
@@ -572,7 +572,7 @@ class DfsTierWriter(
case ex: InterruptedException =>
throw new RuntimeException(ex)
}
- hadoopFs.create(hdfsFileInfo.getDfsPath, true).close()
+ hadoopFs.create(dfsFileInfo.getDfsPath, true).close()
}
storageManager.registerDiskFilePartitionWriter(
@@ -586,9 +586,9 @@ class DfsTierWriter(
override def genFlushTask(finalFlush: Boolean, keepBuffer: Boolean):
FlushTask = {
notifier.numPendingFlushes.incrementAndGet()
- if (hdfsFileInfo.isHdfs) {
- new HdfsFlushTask(flushBuffer, hdfsFileInfo.getDfsPath(), notifier,
true, source)
- } else if (hdfsFileInfo.isOSS) {
+ if (dfsFileInfo.isHdfs) {
+ new HdfsFlushTask(flushBuffer, dfsFileInfo.getDfsPath(), notifier, true,
source)
+ } else if (dfsFileInfo.isOSS) {
val flushTask = new OssFlushTask(
flushBuffer,
notifier,
@@ -641,17 +641,19 @@ class DfsTierWriter(
}
override def closeStreams(): Unit = {
- if (hadoopFs.exists(hdfsFileInfo.getDfsPeerWriterSuccessPath)) {
- hadoopFs.delete(hdfsFileInfo.getDfsPath, false)
+ if (hadoopFs.exists(dfsFileInfo.getDfsPeerWriterSuccessPath)) {
+ hadoopFs.delete(dfsFileInfo.getDfsPath, false)
deleted = true
} else {
- hadoopFs.create(hdfsFileInfo.getDfsWriterSuccessPath).close()
- val indexOutputStream = hadoopFs.create(hdfsFileInfo.getDfsIndexPath)
-
indexOutputStream.writeInt(hdfsFileInfo.getReduceFileMeta.getChunkOffsets.size)
- for (offset <- hdfsFileInfo.getReduceFileMeta.getChunkOffsets.asScala) {
- indexOutputStream.writeLong(offset)
+ hadoopFs.create(dfsFileInfo.getDfsWriterSuccessPath).close()
+ if (dfsFileInfo.isReduceFileMeta) {
+ val indexOutputStream = hadoopFs.create(dfsFileInfo.getDfsIndexPath)
+
indexOutputStream.writeInt(dfsFileInfo.getReduceFileMeta.getChunkOffsets.size)
+ for (offset <- dfsFileInfo.getReduceFileMeta.getChunkOffsets.asScala) {
+ indexOutputStream.writeLong(offset)
+ }
+ indexOutputStream.close()
}
- indexOutputStream.close()
}
if (s3MultipartUploadHandler != null) {
s3MultipartUploadHandler.complete()
@@ -664,12 +666,12 @@ class DfsTierWriter(
}
override def notifyFileCommitted(): Unit =
- storageManager.notifyFileInfoCommitted(shuffleKey, filename, hdfsFileInfo)
+ storageManager.notifyFileInfoCommitted(shuffleKey, filename, dfsFileInfo)
override def closeResource(): Unit = {}
override def cleanLocalOrDfsFiles(): Unit = {
- hdfsFileInfo.deleteAllFiles(hadoopFs)
+ dfsFileInfo.deleteAllFiles(hadoopFs)
}
override def takeBufferInternal(): CompositeByteBuf = {
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
index 8c3a47a07..95d69fc12 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
@@ -25,6 +25,8 @@ import java.util.concurrent.locks.ReentrantLock
import scala.collection.mutable
+import org.apache.commons.lang3.StringUtils
+
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
@@ -151,7 +153,9 @@ trait MiniClusterFeature extends Logging {
def createWorker(map: Map[String, String], storageDir: String): Worker = {
logInfo("start create worker for mini cluster")
val conf = new CelebornConf()
- conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, storageDir)
+ if (StringUtils.isNotEmpty(storageDir)) {
+ conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, storageDir)
+ }
conf.set(CelebornConf.WORKER_DISK_MONITOR_ENABLED.key, "false")
conf.set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K")
conf.set(CelebornConf.WORKER_HTTP_PORT.key, s"${selectRandomPort()}")