RexXiong commented on code in PR #3085:
URL: https://github.com/apache/celeborn/pull/3085#discussion_r1964941115
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:
##########
@@ -371,14 +371,14 @@ private[deploy] class Controller(
private def waitMapPartitionRegionFinished(
fileWriter: PartitionDataWriter,
waitTimeout: Long): Unit = {
- fileWriter match {
- case writer: MapPartitionDataWriter =>
- if (writer.checkPartitionRegionFinished(
- waitTimeout)) {
- logDebug(s"CommitFile succeed to waitMapPartitionRegionFinished
${fileWriter.getFile.getAbsolutePath}")
+ fileWriter.getMetaHandler match {
+ case metahHandler: SegmentMapPartitionMetaHandler =>
Review Comment:
metahHandler -> metaHandler
SegmentMapPartitionMetaHandler -> MapPartitionMetaHandler
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -17,615 +17,155 @@
package org.apache.celeborn.service.deploy.worker.storage;
-import java.io.File;
import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import scala.Tuple4;
+import scala.Option;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import com.google.protobuf.GeneratedMessageV3;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.CompositeByteBuf;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.common.exception.AlreadyClosedException;
-import org.apache.celeborn.common.exception.CelebornIOException;
-import org.apache.celeborn.common.meta.DiskFileInfo;
-import org.apache.celeborn.common.meta.DiskStatus;
-import org.apache.celeborn.common.meta.FileInfo;
-import org.apache.celeborn.common.meta.MemoryFileInfo;
-import org.apache.celeborn.common.metrics.source.AbstractSource;
+import org.apache.celeborn.common.meta.*;
import org.apache.celeborn.common.protocol.PartitionSplitMode;
+import org.apache.celeborn.common.protocol.PartitionType;
import org.apache.celeborn.common.protocol.StorageInfo;
-import org.apache.celeborn.common.unsafe.Platform;
-import org.apache.celeborn.common.util.FileChannelUtils;
-import org.apache.celeborn.reflect.DynConstructors;
-import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler;
-import org.apache.celeborn.service.deploy.worker.WorkerSource;
import
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController;
-import org.apache.celeborn.service.deploy.worker.congestcontrol.UserBufferInfo;
import
org.apache.celeborn.service.deploy.worker.congestcontrol.UserCongestionControlContext;
-import org.apache.celeborn.service.deploy.worker.memory.MemoryManager;
/*
* Note: Once FlushNotifier.exception is set, the whole file is not available.
* That's fine some of the internal state(e.g. bytesFlushed) may be
inaccurate.
*/
-public abstract class PartitionDataWriter implements DeviceObserver {
+public class PartitionDataWriter implements DeviceObserver {
private static final Logger logger =
LoggerFactory.getLogger(PartitionDataWriter.class);
- private static final long WAIT_INTERVAL_MS = 5;
- // After commit file, there will be only 1 fileinfo left.
- protected DiskFileInfo diskFileInfo = null;
- protected MemoryFileInfo memoryFileInfo = null;
- private FileChannel channel;
- private volatile boolean closed;
- private volatile boolean destroyed;
-
- protected final AtomicInteger numPendingWrites = new AtomicInteger();
-
- public Flusher flusher;
- private int flushWorkerIndex;
-
- protected CompositeByteBuf flushBuffer;
-
- protected final Object flushLock = new Object();
- private final long writerCloseTimeoutMs;
-
- protected long flusherBufferSize;
+ protected TierWriterProxy tierWriterProxy;
protected final DeviceMonitor deviceMonitor;
- protected final AbstractSource source; // metrics
-
private final long splitThreshold;
private final PartitionSplitMode splitMode;
- private final boolean rangeReadFilter;
- protected boolean deleted = false;
- private RoaringBitmap mapIdBitMap = null;
protected final FlushNotifier notifier = new FlushNotifier();
// It's only needed when graceful shutdown is enabled
- private final String shuffleKey;
protected final StorageManager storageManager;
- private final boolean workerGracefulShutdown;
- protected final long memoryFileStorageMaxFileSize;
- protected AtomicBoolean isMemoryShuffleFile = new AtomicBoolean();
- protected final String filename;
- protected ByteBufAllocator allocator;
- private final PartitionDataWriterContext writerContext;
- private final long localFlusherBufferSize;
- private final long hdfsFlusherBufferSize;
-
- private final long s3FlusherBufferSize;
- private Exception exception = null;
- private boolean metricsCollectCriticalEnabled;
- private long chunkSize;
- private UserBufferInfo userBufferInfo = null;
-
- protected FileSystem hadoopFs;
-
private UserCongestionControlContext userCongestionControlContext = null;
-
- protected MultipartUploadHandler s3MultipartUploadHandler;
-
- protected int partNumber = 1;
-
- private final int s3MultiplePartUploadMaxRetries;
+ public static String writerString;
public PartitionDataWriter(
StorageManager storageManager,
- AbstractSource workerSource,
CelebornConf conf,
DeviceMonitor deviceMonitor,
PartitionDataWriterContext writerContext,
- boolean supportInMemory)
+ PartitionType partitionType)
throws IOException {
this.storageManager = storageManager;
- this.writerCloseTimeoutMs = conf.workerWriterCloseTimeoutMs();
- this.workerGracefulShutdown = conf.workerGracefulShutdown();
this.splitThreshold = writerContext.getSplitThreshold();
this.deviceMonitor = deviceMonitor;
this.splitMode = writerContext.getPartitionSplitMode();
- this.rangeReadFilter = writerContext.isRangeReadFilter();
- this.shuffleKey = writerContext.getShuffleKey();
- this.memoryFileStorageMaxFileSize =
conf.workerMemoryFileStorageMaxFileSize();
- this.filename = writerContext.getPartitionLocation().getFileName();
- this.writerContext = writerContext;
- this.localFlusherBufferSize = conf.workerFlusherBufferSize();
- this.hdfsFlusherBufferSize = conf.workerHdfsFlusherBufferSize();
- this.s3FlusherBufferSize = conf.workerS3FlusherBufferSize();
- this.metricsCollectCriticalEnabled = conf.metricsCollectCriticalEnabled();
- this.chunkSize = conf.shuffleChunkSize();
- this.s3MultiplePartUploadMaxRetries =
conf.s3MultiplePartUploadMaxRetries();
-
- Tuple4<MemoryFileInfo, Flusher, DiskFileInfo, File> createFileResult =
- storageManager.createFile(writerContext, supportInMemory);
-
- // Reduce partition data writers support memory storage now
- if (supportInMemory && createFileResult._1() != null) {
- this.memoryFileInfo = createFileResult._1();
- this.allocator = storageManager.storageBufferAllocator();
- this.isMemoryShuffleFile.set(true);
- storageManager.registerMemoryPartitionWriter(this,
createFileResult._1());
- } else if (createFileResult._2() != null) {
- this.diskFileInfo = createFileResult._3();
- this.flusher = createFileResult._2();
- this.flushWorkerIndex = this.flusher.getWorkerIndex();
- File workingDir = createFileResult._4();
- this.isMemoryShuffleFile.set(false);
- initFileChannelsForDiskFile();
- storageManager.registerDiskFilePartitionWriter(this, workingDir,
diskFileInfo);
- } else {
- throw new CelebornIOException(
- "Create file failed for location:" +
writerContext.getPartitionLocation().toString());
- }
+ String shuffleKey = writerContext.getShuffleKey();
+ String filename = writerContext.getPartitionLocation().getFileName();
+
+ writerString = shuffleKey + "-" + filename + " partition-writer";
- source = workerSource;
logger.debug("FileWriter {} split threshold {} mode {}", this,
splitThreshold, splitMode);
- if (rangeReadFilter) {
- this.mapIdBitMap = new RoaringBitmap();
- }
- takeBuffer();
if (CongestionController.instance() != null) {
userCongestionControlContext =
CongestionController.instance()
.getUserCongestionContext(writerContext.getUserIdentifier());
}
- }
- public void initFileChannelsForDiskFile() throws IOException {
- if (!this.diskFileInfo.isDFS()) {
- this.flusherBufferSize = localFlusherBufferSize;
- channel =
FileChannelUtils.createWritableFileChannel(this.diskFileInfo.getFilePath());
+ tierWriterProxy = new TierWriterProxy(writerContext, storageManager, conf,
partitionType);
+ if (tierWriterProxy.currentTierWriter() instanceof MemoryTierWriter) {
Review Comment:
should register PartitionWriter to StorageManger in tierWriter
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala:
##########
@@ -93,14 +93,13 @@ abstract class TierWriterBase(
}
def close(evict: Boolean = false): Long = {
+ ensureNotClosed()
Review Comment:
add synchronized block
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterProxy.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import java.io.IOException
+import java.util.concurrent.atomic.AtomicInteger
+
+import com.google.protobuf.GeneratedMessageV3
+import io.netty.buffer.ByteBuf
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MemoryFileInfo}
+import org.apache.celeborn.common.protocol.{PartitionType, StorageInfo}
+import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
+
+class TierWriterProxy(
+ partitionDataWriterContext: PartitionDataWriterContext,
+ storageManager: StorageManager,
+ conf: CelebornConf,
+ partitionType: PartitionType) {
+ val memoryFileStorageMaxFileSize = conf.workerMemoryFileStorageMaxFileSize
+ val notifier = new FlushNotifier
+ val numPendingWrites = new AtomicInteger
+ @volatile var currentTierWriter: TierWriterBase = _
+ @volatile var fileClosed = false
+
+ currentTierWriter =
+ storageManager.storagePolicy.createFileWriter(
+ partitionDataWriterContext,
+ partitionType,
+ numPendingWrites,
+ notifier)
+
+ def write(buf: ByteBuf): Unit = this.synchronized {
+ currentTierWriter.ensureNotClosed()
+ if (currentTierWriter.needEvict()) {
+ evict(false)
+ }
+ currentTierWriter.write(buf)
+ }
+
+ def evict(checkClose: Boolean): Unit = this.synchronized {
+ if (checkClose) {
+ if (fileClosed) {
+ return
+ }
+ }
+ val nFile =
+ storageManager.storagePolicy.getEvictedFileWriter(
+ currentTierWriter,
+ partitionDataWriterContext,
+ partitionType,
+ numPendingWrites,
+ notifier)
+ currentTierWriter.evict(nFile)
+ currentTierWriter = nFile
+ }
+
+ def flush(finalFlush: Boolean = false): Unit = this.synchronized {
+ currentTierWriter.flush(finalFlush)
+ }
+
+ def getCurrentFileInfo(): FileInfo = {
+ currentTierWriter.fileInfo
+ }
+
+ def needHardSplitForMemoryFile(): Boolean = {
+ if (!currentTierWriter.isInstanceOf[MemoryTierWriter]) {
+ return false;
+ }
+ !storageManager.localOrDfsStorageAvailable &&
+ (currentTierWriter.fileInfo.getFileLength > memoryFileStorageMaxFileSize ||
+ !MemoryManager.instance.memoryFileStorageAvailable())
+ }
+
+ def getCurrentStorageInfo(): StorageInfo = {
Review Comment:
StorageInfo need also set filesize/ChunkOffsets
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1419,11 +1407,12 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
}
private def checkDiskFull(fileWriter: PartitionDataWriter): Boolean = {
- if (fileWriter.flusher == null || fileWriter.flusher.isInstanceOf[
- HdfsFlusher] || fileWriter.flusher.isInstanceOf[S3Flusher]) {
+ val flusher = fileWriter.getFlusher;
Review Comment:
can be replaced by `if (flusher.isInstanceOf[LocalFlusher]) ...`
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -17,615 +17,155 @@
package org.apache.celeborn.service.deploy.worker.storage;
-import java.io.File;
import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import scala.Tuple4;
+import scala.Option;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import com.google.protobuf.GeneratedMessageV3;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.CompositeByteBuf;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.common.exception.AlreadyClosedException;
-import org.apache.celeborn.common.exception.CelebornIOException;
-import org.apache.celeborn.common.meta.DiskFileInfo;
-import org.apache.celeborn.common.meta.DiskStatus;
-import org.apache.celeborn.common.meta.FileInfo;
-import org.apache.celeborn.common.meta.MemoryFileInfo;
-import org.apache.celeborn.common.metrics.source.AbstractSource;
+import org.apache.celeborn.common.meta.*;
import org.apache.celeborn.common.protocol.PartitionSplitMode;
+import org.apache.celeborn.common.protocol.PartitionType;
import org.apache.celeborn.common.protocol.StorageInfo;
-import org.apache.celeborn.common.unsafe.Platform;
-import org.apache.celeborn.common.util.FileChannelUtils;
-import org.apache.celeborn.reflect.DynConstructors;
-import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler;
-import org.apache.celeborn.service.deploy.worker.WorkerSource;
import
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController;
-import org.apache.celeborn.service.deploy.worker.congestcontrol.UserBufferInfo;
import
org.apache.celeborn.service.deploy.worker.congestcontrol.UserCongestionControlContext;
-import org.apache.celeborn.service.deploy.worker.memory.MemoryManager;
/*
* Note: Once FlushNotifier.exception is set, the whole file is not available.
* That's fine some of the internal state(e.g. bytesFlushed) may be
inaccurate.
*/
-public abstract class PartitionDataWriter implements DeviceObserver {
+public class PartitionDataWriter implements DeviceObserver {
private static final Logger logger =
LoggerFactory.getLogger(PartitionDataWriter.class);
- private static final long WAIT_INTERVAL_MS = 5;
- // After commit file, there will be only 1 fileinfo left.
- protected DiskFileInfo diskFileInfo = null;
- protected MemoryFileInfo memoryFileInfo = null;
- private FileChannel channel;
- private volatile boolean closed;
- private volatile boolean destroyed;
-
- protected final AtomicInteger numPendingWrites = new AtomicInteger();
-
- public Flusher flusher;
- private int flushWorkerIndex;
-
- protected CompositeByteBuf flushBuffer;
-
- protected final Object flushLock = new Object();
- private final long writerCloseTimeoutMs;
-
- protected long flusherBufferSize;
+ protected TierWriterProxy tierWriterProxy;
protected final DeviceMonitor deviceMonitor;
- protected final AbstractSource source; // metrics
-
private final long splitThreshold;
private final PartitionSplitMode splitMode;
- private final boolean rangeReadFilter;
- protected boolean deleted = false;
- private RoaringBitmap mapIdBitMap = null;
protected final FlushNotifier notifier = new FlushNotifier();
// It's only needed when graceful shutdown is enabled
- private final String shuffleKey;
protected final StorageManager storageManager;
- private final boolean workerGracefulShutdown;
- protected final long memoryFileStorageMaxFileSize;
- protected AtomicBoolean isMemoryShuffleFile = new AtomicBoolean();
- protected final String filename;
- protected ByteBufAllocator allocator;
- private final PartitionDataWriterContext writerContext;
- private final long localFlusherBufferSize;
- private final long hdfsFlusherBufferSize;
-
- private final long s3FlusherBufferSize;
- private Exception exception = null;
- private boolean metricsCollectCriticalEnabled;
- private long chunkSize;
- private UserBufferInfo userBufferInfo = null;
-
- protected FileSystem hadoopFs;
-
private UserCongestionControlContext userCongestionControlContext = null;
-
- protected MultipartUploadHandler s3MultipartUploadHandler;
-
- protected int partNumber = 1;
-
- private final int s3MultiplePartUploadMaxRetries;
+ public static String writerString;
Review Comment:
why use static variable for the writer?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]