This is an automated email from the ASF dual-hosted git repository. nicholasjiang pushed a commit to branch branch-0.6 in repository https://gitbox.apache.org/repos/asf/celeborn.git
commit d82f37fd4cc0743ed159d47f31c892fec73c6e57 Author: xxx <[email protected]> AuthorDate: Tue Aug 19 16:38:19 2025 +0800 [CELEBORN-2108] Remove redundant PartitionType ### What changes were proposed in this pull request? Remove redundant `PartitionType`. ### Why are the changes needed? `PartitionType` is included in `PartitionDataWriterContext`, therefore it is not necessary to use `PartitionType` as method parameter. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI. Closes #3422 from xy2953396112/remove_useless_partition_type. Authored-by: xxx <[email protected]> Signed-off-by: SteNicholas <[email protected]> (cherry picked from commit b537798e37be1e5d7e905af6c7a2df905f1b0da5) Signed-off-by: SteNicholas <[email protected]> --- .../deploy/worker/storage/PartitionDataWriter.java | 13 ++---- .../deploy/worker/storage/StorageManager.scala | 3 +- .../deploy/worker/storage/StoragePolicy.scala | 7 +--- .../storage/PartitionDataWriterSuiteUtils.java | 9 ++-- .../local/DiskMapPartitionDataWriterSuiteJ.java | 3 +- .../local/DiskReducePartitionDataWriterSuiteJ.java | 39 ++++++----------- .../MemoryReducePartitionDataWriterSuiteJ.java | 49 +++++++--------------- .../storage/storagePolicy/StoragePolicyCase1.scala | 2 +- .../storage/storagePolicy/StoragePolicyCase2.scala | 2 +- .../storage/storagePolicy/StoragePolicyCase3.scala | 4 +- .../storage/storagePolicy/StoragePolicyCase4.scala | 2 +- 11 files changed, 43 insertions(+), 90 deletions(-) diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java index e0f16c01a..35dfdc331 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java @@ -34,7 +34,6 @@ import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.meta.*; import org.apache.celeborn.common.metrics.source.AbstractSource; import org.apache.celeborn.common.protocol.PartitionSplitMode; -import org.apache.celeborn.common.protocol.PartitionType; import org.apache.celeborn.common.protocol.StorageInfo; import org.apache.celeborn.service.deploy.worker.WorkerSource; import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController; @@ -54,7 +53,6 @@ public class PartitionDataWriter implements DeviceObserver { private final AtomicInteger numPendingWrites = new AtomicInteger(0); private final PartitionDataWriterContext writerContext; protected final AbstractSource source; // metrics - private final PartitionType partitionType; private final String writerString; private final StorageManager storageManager; private final FlushNotifier notifier = new FlushNotifier(); @@ -67,8 +65,7 @@ public class PartitionDataWriter implements DeviceObserver { AbstractSource workerSource, CelebornConf conf, DeviceMonitor deviceMonitor, - PartitionDataWriterContext writerContext, - PartitionType partitionType) { + PartitionDataWriterContext writerContext) { memoryFileStorageMaxFileSize = conf.workerMemoryFileStorageMaxFileSize(); this.writerContext = writerContext; this.source = workerSource; @@ -90,11 +87,8 @@ public class PartitionDataWriter implements DeviceObserver { writerContext.setPartitionDataWriter(this); writerContext.setDeviceMonitor(deviceMonitor); - this.partitionType = partitionType; currentTierWriter = - storageManager - .storagePolicy() - .createFileWriter(writerContext, partitionType, numPendingWrites, notifier); + storageManager.storagePolicy().createFileWriter(writerContext, numPendingWrites, notifier); } public DiskFileInfo getDiskFileInfo() { @@ -198,8 +192,7 @@ public class PartitionDataWriter implements DeviceObserver { TierWriterBase newTierWriter = storageManager .storagePolicy() - .getEvictedFileWriter( - currentTierWriter, writerContext, partitionType, numPendingWrites, notifier); + .getEvictedFileWriter(currentTierWriter, writerContext, numPendingWrites, notifier); currentTierWriter.evict(newTierWriter); currentTierWriter = newTierWriter; } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index ddd9c6666..4d5a35449 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -478,8 +478,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs workerSource, conf, deviceMonitor, - partitionDataWriterContext, - partitionType) + partitionDataWriterContext) } catch { case e: Exception => logError("Create partition data writer failed", e) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala index 798ad165e..a475d12f2 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala @@ -36,7 +36,6 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: def getEvictedFileWriter( celebornFile: TierWriterBase, partitionDataWriterContext: PartitionDataWriterContext, - partitionType: PartitionType, numPendingWrites: AtomicInteger, notifier: FlushNotifier): TierWriterBase = { evictFileOrder.foreach { order => @@ -44,7 +43,6 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: if (orderList != null) { return createFileWriter( partitionDataWriterContext, - partitionType, numPendingWrites, notifier, orderList, @@ -57,12 +55,10 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: def createFileWriter( partitionDataWriterContext: PartitionDataWriterContext, - partitionType: PartitionType, numPendingWrites: AtomicInteger, notifier: FlushNotifier): TierWriterBase = { createFileWriter( partitionDataWriterContext: PartitionDataWriterContext, - partitionType: PartitionType, numPendingWrites: AtomicInteger, notifier: FlushNotifier, createFileOrder) @@ -70,7 +66,6 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: def createFileWriter( partitionDataWriterContext: PartitionDataWriterContext, - partitionType: PartitionType, numPendingWrites: AtomicInteger, notifier: FlushNotifier, order: Option[List[String]] = createFileOrder, @@ -84,7 +79,7 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: } def getPartitionMetaHandler(fileInfo: FileInfo) = { - partitionType match { + partitionDataWriterContext.getPartitionType match { case PartitionType.REDUCE => new ReducePartitionMetaHandler(partitionDataWriterContext.isRangeReadFilter, fileInfo) case PartitionType.MAP => diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java index 9d5c6c26a..471a44865 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java @@ -103,7 +103,7 @@ public class PartitionDataWriterSuiteUtils { context, storageManager)) .when(storagePolicy) - .createFileWriter(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + .createFileWriter(Mockito.any(), Mockito.any(), Mockito.any()); return storageManager; } @@ -148,7 +148,7 @@ public class PartitionDataWriterSuiteUtils { writerContext, storageManager)) .when(storagePolicy) - .createFileWriter(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + .createFileWriter(Mockito.any(), Mockito.any(), Mockito.any()); return storageManager; } @@ -223,7 +223,7 @@ public class PartitionDataWriterSuiteUtils { writerContext, storageManager)) .when(storagePolicy) - .createFileWriter(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + .createFileWriter(Mockito.any(), Mockito.any(), Mockito.any()); Mockito.doAnswer( invocation -> @@ -240,8 +240,7 @@ public class PartitionDataWriterSuiteUtils { writerContext, storageManager)) .when(storagePolicy) - .getEvictedFileWriter( - Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + .getEvictedFileWriter(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); return storageManager; } diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java index 87ed418a3..329d7f90f 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java @@ -141,8 +141,7 @@ public class DiskMapPartitionDataWriterSuiteJ { source, CONF, DeviceMonitor$.MODULE$.EmptyMonitor(), - context, - PartitionType.MAP); + context); fileWriter.handleEvents( PbPushDataHandShake.newBuilder().setNumPartitions(2).setBufferSize(32).build()); fileWriter.handleEvents( diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java index 1faa04a55..536b4aab3 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java @@ -288,8 +288,7 @@ public class DiskReducePartitionDataWriterSuiteJ { source, CONF, DeviceMonitor$.MODULE$.EmptyMonitor(), - context, - PartitionType.REDUCE); + context); List<Future<?>> futures = new ArrayList<>(); ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, "FileWriter-UT-1"); @@ -343,8 +342,7 @@ public class DiskReducePartitionDataWriterSuiteJ { source, CONF, DeviceMonitor$.MODULE$.EmptyMonitor(), - context, - PartitionType.REDUCE); + context); List<Future<?>> futures = new ArrayList<>(); ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, "FileWriter-UT-1"); @@ -399,8 +397,7 @@ public class DiskReducePartitionDataWriterSuiteJ { source, CONF, DeviceMonitor$.MODULE$.EmptyMonitor(), - context, - PartitionType.REDUCE); + context); List<Future<?>> futures = new ArrayList<>(); ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, "FileWriter-UT-2"); @@ -470,8 +467,7 @@ public class DiskReducePartitionDataWriterSuiteJ { source, CONF, DeviceMonitor$.MODULE$.EmptyMonitor(), - context, - PartitionType.REDUCE); + context); List<Future<?>> futures = new ArrayList<>(); ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, "FileWriter-UT-2"); @@ -590,8 +586,7 @@ public class DiskReducePartitionDataWriterSuiteJ { source, conf, DeviceMonitor$.MODULE$.EmptyMonitor(), - context1, - PartitionType.REDUCE); + context1); partitionDataWriter.write(generateData(8 * 1024 * 1024)); partitionDataWriter.close(); ReduceFileMeta reduceFileMeta = @@ -623,8 +618,7 @@ public class DiskReducePartitionDataWriterSuiteJ { source, conf, DeviceMonitor$.MODULE$.EmptyMonitor(), - context2, - PartitionType.REDUCE); + context2); for (int i = 0; i < 8; i++) { partitionDataWriter.write(generateData(128)); } @@ -656,8 +650,7 @@ public class DiskReducePartitionDataWriterSuiteJ { source, conf, DeviceMonitor$.MODULE$.EmptyMonitor(), - context3, - PartitionType.REDUCE); + context3); partitionDataWriter.write(generateData(1020)); partitionDataWriter.write(generateData(3)); partitionDataWriter.close(); @@ -688,8 +681,7 @@ public class DiskReducePartitionDataWriterSuiteJ { source, conf, DeviceMonitor$.MODULE$.EmptyMonitor(), - context4, - PartitionType.REDUCE); + context4); for (int i = 0; i < 8; i++) { partitionDataWriter.write(generateData(128)); } @@ -722,8 +714,7 @@ public class DiskReducePartitionDataWriterSuiteJ { source, conf, DeviceMonitor$.MODULE$.EmptyMonitor(), - context5, - PartitionType.REDUCE); + context5); for (int i = 0; i < 16; i++) { partitionDataWriter.write(generateData(128)); } @@ -755,8 +746,7 @@ public class DiskReducePartitionDataWriterSuiteJ { source, conf, DeviceMonitor$.MODULE$.EmptyMonitor(), - context6, - PartitionType.REDUCE); + context6); for (int i = 0; i < 16; i++) { partitionDataWriter.write(generateData(128)); } @@ -790,8 +780,7 @@ public class DiskReducePartitionDataWriterSuiteJ { source, conf, DeviceMonitor$.MODULE$.EmptyMonitor(), - context7, - PartitionType.REDUCE); + context7); for (int i = 0; i < 16; i++) { partitionDataWriter.write(generateData(128)); } @@ -824,8 +813,7 @@ public class DiskReducePartitionDataWriterSuiteJ { source, conf, DeviceMonitor$.MODULE$.EmptyMonitor(), - context8, - PartitionType.REDUCE); + context8); partitionDataWriter.write(generateData(1024)); for (int i = 0; i < 9; i++) { partitionDataWriter.write(generateData(128)); @@ -859,8 +847,7 @@ public class DiskReducePartitionDataWriterSuiteJ { source, conf, DeviceMonitor$.MODULE$.EmptyMonitor(), - context9, - PartitionType.REDUCE); + context9); partitionDataWriter.write(generateData(1024)); for (int i = 0; i < 9; i++) { partitionDataWriter.write(generateData(128)); diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java index 006035d85..8aa0311be 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java @@ -123,12 +123,7 @@ public class MemoryReducePartitionDataWriterSuiteJ { storageManager)) .when(storagePolicy) .createFileWriter( - Mockito.any(), - Mockito.any(), - Mockito.any(), - Mockito.any(), - Mockito.any(), - Mockito.anyBoolean()); + Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); return storageManager; } @@ -305,8 +300,7 @@ public class MemoryReducePartitionDataWriterSuiteJ { source, CONF, DeviceMonitor$.MODULE$.EmptyMonitor(), - writerContext, - PartitionType.REDUCE); + writerContext); List<Future<?>> futures = new ArrayList<>(); ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, "FileWriter-UT-1"); @@ -360,8 +354,7 @@ public class MemoryReducePartitionDataWriterSuiteJ { source, CONF, DeviceMonitor$.MODULE$.EmptyMonitor(), - writerContext, - PartitionType.REDUCE); + writerContext); List<Future<?>> futures = new ArrayList<>(); ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, "FileWriter-UT-1"); @@ -424,8 +417,7 @@ public class MemoryReducePartitionDataWriterSuiteJ { source, CONF, DeviceMonitor$.MODULE$.EmptyMonitor(), - context1, - PartitionType.REDUCE); + context1); List<Future<?>> futures = new ArrayList<>(); ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, "FileWriter-UT-2"); @@ -486,8 +478,7 @@ public class MemoryReducePartitionDataWriterSuiteJ { source, CONF, DeviceMonitor$.MODULE$.EmptyMonitor(), - context, - PartitionType.REDUCE); + context); List<Future<?>> futures = new ArrayList<>(); ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, "FileWriter-UT-2"); @@ -575,8 +566,7 @@ public class MemoryReducePartitionDataWriterSuiteJ { source, CONF, DeviceMonitor$.MODULE$.EmptyMonitor(), - context, - PartitionType.REDUCE); + context); List<Future<?>> futures = new ArrayList<>(); ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, "FileWriter-UT-2"); @@ -698,8 +688,7 @@ public class MemoryReducePartitionDataWriterSuiteJ { source, conf, DeviceMonitor$.MODULE$.EmptyMonitor(), - writerContext1, - PartitionType.REDUCE); + writerContext1); partitionDataWriter.write(generateDataWithHeader(8 * 1024 * 1024)); partitionDataWriter.close(); ReduceFileMeta reduceFileMeta = @@ -731,8 +720,7 @@ public class MemoryReducePartitionDataWriterSuiteJ { source, conf, DeviceMonitor$.MODULE$.EmptyMonitor(), - writerContext2, - PartitionType.REDUCE); + writerContext2); for (int i = 0; i < 8; i++) { partitionDataWriter.write(generateDataWithHeader(128)); } @@ -764,8 +752,7 @@ public class MemoryReducePartitionDataWriterSuiteJ { source, conf, DeviceMonitor$.MODULE$.EmptyMonitor(), - writerContext3, - PartitionType.REDUCE); + writerContext3); partitionDataWriter.write(generateDataWithHeader(1020)); partitionDataWriter.write(generateDataWithHeader(3)); partitionDataWriter.close(); @@ -796,8 +783,7 @@ public class MemoryReducePartitionDataWriterSuiteJ { source, conf, DeviceMonitor$.MODULE$.EmptyMonitor(), - writerContext4, - PartitionType.REDUCE); + writerContext4); for (int i = 0; i < 8; i++) { partitionDataWriter.write(generateDataWithHeader(128)); } @@ -830,8 +816,7 @@ public class MemoryReducePartitionDataWriterSuiteJ { source, conf, DeviceMonitor$.MODULE$.EmptyMonitor(), - writerContext5, - PartitionType.REDUCE); + writerContext5); for (int i = 0; i < 16; i++) { partitionDataWriter.write(generateDataWithHeader(128)); } @@ -863,8 +848,7 @@ public class MemoryReducePartitionDataWriterSuiteJ { source, conf, DeviceMonitor$.MODULE$.EmptyMonitor(), - writerContext6, - PartitionType.REDUCE); + writerContext6); for (int i = 0; i < 16; i++) { partitionDataWriter.write(generateDataWithHeader(128)); } @@ -898,8 +882,7 @@ public class MemoryReducePartitionDataWriterSuiteJ { source, conf, DeviceMonitor$.MODULE$.EmptyMonitor(), - writerContext7, - PartitionType.REDUCE); + writerContext7); for (int i = 0; i < 16; i++) { partitionDataWriter.write(generateDataWithHeader(128)); } @@ -932,8 +915,7 @@ public class MemoryReducePartitionDataWriterSuiteJ { source, conf, DeviceMonitor$.MODULE$.EmptyMonitor(), - writerContext8, - PartitionType.REDUCE); + writerContext8); partitionDataWriter.write(generateDataWithHeader(1024)); for (int i = 0; i < 9; i++) { partitionDataWriter.write(generateDataWithHeader(128)); @@ -967,8 +949,7 @@ public class MemoryReducePartitionDataWriterSuiteJ { source, conf, DeviceMonitor$.MODULE$.EmptyMonitor(), - writerContext9, - PartitionType.REDUCE); + writerContext9); partitionDataWriter.write(generateDataWithHeader(1024)); for (int i = 0; i < 9; i++) { partitionDataWriter.write(generateDataWithHeader(128)); diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala index 09f34cb40..640a67113 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala @@ -99,6 +99,7 @@ class StoragePolicyCase1 extends CelebornFunSuite { test("test create file order case1") { when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(memoryHintPartitionLocation) + when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE) val conf = new CelebornConf() val flushLock = new AnyRef conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "MEMORY,SSD,HDD,HDFS,OSS,S3") @@ -107,7 +108,6 @@ class StoragePolicyCase1 extends CelebornFunSuite { val notifier = new FlushNotifier val file = storagePolicy.createFileWriter( mockedPartitionWriterContext, - PartitionType.REDUCE, pendingWriters, notifier) assert(file.isInstanceOf[MemoryTierWriter]) diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala index 0255262f9..28f6e87e0 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala @@ -99,6 +99,7 @@ class StoragePolicyCase2 extends CelebornFunSuite { test("test create file order case2") { when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(localHintPartitionLocatioin) + when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE) when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true) val conf = new CelebornConf() conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "SSD,HDD,HDFS,OSS,S3") @@ -107,7 +108,6 @@ class StoragePolicyCase2 extends CelebornFunSuite { val notifier = new FlushNotifier val file = storagePolicy.createFileWriter( mockedPartitionWriterContext, - PartitionType.REDUCE, pendingWriters, notifier) assert(file.isInstanceOf[LocalTierWriter]) diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala index c307adc37..cb21b7128 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala @@ -99,6 +99,7 @@ class StoragePolicyCase3 extends CelebornFunSuite { test("test getEvicted file case1") { when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(localHintPartitionLocatioin) + when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE) when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true) val mockedMemoryFile = mock[LocalTierWriter] val conf = new CelebornConf() @@ -110,7 +111,6 @@ class StoragePolicyCase3 extends CelebornFunSuite { val nFile = storagePolicy.getEvictedFileWriter( mockedMemoryFile, mockedPartitionWriterContext, - PartitionType.REDUCE, pendingWriters, notifier) assert(nFile.isInstanceOf[LocalTierWriter]) @@ -118,6 +118,7 @@ class StoragePolicyCase3 extends CelebornFunSuite { test("test evict file case2") { when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(memoryHintPartitionLocation) + when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE) when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true) val mockedMemoryFile = mock[LocalTierWriter] val conf = new CelebornConf() @@ -129,7 +130,6 @@ class StoragePolicyCase3 extends CelebornFunSuite { val nFile = storagePolicy.getEvictedFileWriter( mockedMemoryFile, mockedPartitionWriterContext, - PartitionType.REDUCE, pendingWriters, notifier) assert(nFile.isInstanceOf[LocalTierWriter]) diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala index eae4d0727..e27def478 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala @@ -100,6 +100,7 @@ class StoragePolicyCase4 extends CelebornFunSuite { test("test create file fallback case1") { when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer( memoryDisabledHintPartitionLocation) + when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE) when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true) val conf = new CelebornConf() conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "MEMORY,SSD,HDD,HDFS,OSS,S3") @@ -108,7 +109,6 @@ class StoragePolicyCase4 extends CelebornFunSuite { val notifier = new FlushNotifier val file = storagePolicy.createFileWriter( mockedPartitionWriterContext, - PartitionType.REDUCE, pendingWriters, notifier) assert(file.isInstanceOf[LocalTierWriter])
