This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new e734ceb55 [MINOR] Cleanup code
e734ceb55 is described below
commit e734ceb55830c24d9201efdb2d4cc17a60aff90f
Author: sychen <[email protected]>
AuthorDate: Mon Jun 19 11:31:51 2023 +0800
[MINOR] Cleanup code
### What changes were proposed in this pull request?
1. Use `<arg>-Ywarn-unused-import</arg>` to remove some unused imports
There is no way to use `<arg>-Ywarn-unused-import</arg>` at this stage
Because we have the following code
```
// Can Remove this if celeborn don't support scala211 in future
import org.apache.celeborn.common.util.FunctionConverter._
```
2. Fix scala case match not fully covered, avoid `scala.MatchError`
3. Fixed some scala compilation warnings
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1600 from cxzl25/cleanup_code.
Authored-by: sychen <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../org/apache/spark/shuffle/celeborn/RssShuffleReader.scala | 2 +-
.../scala/org/apache/celeborn/client/LifecycleManager.scala | 10 ++++++++--
.../org/apache/celeborn/client/ReleasePartitionManager.scala | 2 +-
.../org/apache/celeborn/client/commit/CommitHandler.scala | 2 +-
.../apache/celeborn/common/internal/config/ConfigBuilder.scala | 1 -
.../apache/celeborn/common/internal/config/ConfigEntry.scala | 2 --
.../celeborn/common/meta/ShufflePartitionLocationInfo.scala | 1 -
.../scala/org/apache/celeborn/common/meta/WorkerInfo.scala | 2 --
.../org/apache/celeborn/common/metrics/MetricsSystem.scala | 2 +-
.../apache/celeborn/common/metrics/source/AbstractSource.scala | 2 +-
.../org/apache/celeborn/common/metrics/source/JVMSource.scala | 1 +
.../celeborn/common/protocol/message/ControlMessages.scala | 4 ++--
.../org/apache/celeborn/common/util/FunctionConverter.scala | 2 ++
.../scala/org/apache/celeborn/common/util/ThreadUtils.scala | 2 +-
.../src/main/scala/org/apache/celeborn/common/util/Utils.scala | 2 +-
.../org/apache/celeborn/common/meta/WorkerInfoSuite.scala | 2 +-
.../org/apache/celeborn/service/deploy/master/Master.scala | 1 -
.../celeborn/service/deploy/master/MasterArguments.scala | 1 -
.../service/deploy/master/clustermeta/ha/MasterNode.scala | 2 --
pom.xml | 9 +++++++++
.../scala/org/apache/celeborn/tests/flink/WordCountTest.scala | 2 --
.../celeborn/service/deploy/worker/PushDataHandler.scala | 8 +++++++-
.../service/deploy/worker/storage/StorageManager.scala | 2 ++
23 files changed, 39 insertions(+), 25 deletions(-)
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
index f647dd29e..e0cc31a06 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
@@ -21,7 +21,7 @@ import org.apache.spark.{InterruptibleIterator, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.shuffle.{ShuffleReader, ShuffleReadMetricsReporter}
import org.apache.spark.sql.execution.UnsafeRowSerializer
-import org.apache.spark.sql.execution.columnar.{RssBatchBuilder,
RssColumnarBatchBuilder, RssColumnarBatchSerializer}
+import org.apache.spark.sql.execution.columnar.{RssBatchBuilder,
RssColumnarBatchSerializer}
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.ExternalSorter
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 77dcfc559..779dec0ef 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -18,7 +18,7 @@
package org.apache.celeborn.client
import java.util
-import java.util.{function, HashSet => JHashSet, List => JList, Set => JSet}
+import java.util.{function, List => JList}
import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
import scala.collection.JavaConverters._
@@ -297,6 +297,8 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
attemptId,
partitionId,
numMappers)
+ case _ =>
+ throw new UnsupportedOperationException(s"Not support $partitionType
yet")
}
case GetReducerFileGroup(applicationId: String, shuffleId: Int) =>
@@ -340,6 +342,8 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
initialLocs)
case PartitionType.REDUCE =>
context.reply(RegisterShuffleResponse(StatusCode.SUCCESS,
initialLocs))
+ case _ =>
+ throw new UnsupportedOperationException(s"Not support
$partitionType yet")
}
return
}
@@ -398,6 +402,8 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
context.reply(response)
}
case PartitionType.REDUCE => context.reply(response)
+ case _ =>
+ throw new UnsupportedOperationException(s"Not support
$partitionType yet")
}
}))
registeringShuffleRequest.remove(shuffleId)
@@ -406,7 +412,7 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
// First, request to get allocated slots from Master
val ids = new util.ArrayList[Integer](numPartitions)
- (0 until numPartitions).foreach(idx => ids.add(new Integer(idx)))
+ (0 until numPartitions).foreach(idx => ids.add(Integer.valueOf(idx)))
val res = requestMasterRequestSlotsWithRetry(applicationId, shuffleId, ids)
res.status match {
diff --git
a/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala
b/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala
index 6dc635078..f8ae855f2 100644
---
a/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala
@@ -18,7 +18,7 @@
package org.apache.celeborn.client
import java.util
-import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService,
ScheduledFuture, TimeUnit}
+import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture,
TimeUnit}
import scala.collection.JavaConverters._
import scala.concurrent.duration.DurationInt
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index b88af9723..1b9a9e5f2 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable
import org.apache.celeborn.client.{ShuffleCommittedInfo, WorkerStatusTracker}
import org.apache.celeborn.client.CommitManager.CommittedPartitionInfo
-import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers,
ShuffleFailedWorkers, ShuffleFileGroups}
+import org.apache.celeborn.client.LifecycleManager.{ShuffleFailedWorkers,
ShuffleFileGroups}
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{ShufflePartitionLocationInfo,
WorkerInfo}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigBuilder.scala
b/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigBuilder.scala
index 76fd10f09..0dff64d4d 100644
---
a/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigBuilder.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigBuilder.scala
@@ -22,7 +22,6 @@ import java.util.regex.PatternSyntaxException
import scala.util.matching.Regex
-import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.network.util.ByteUnit
import org.apache.celeborn.common.util.{JavaUtils, Utils}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala
b/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala
index ebd391399..9360688b4 100644
---
a/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala
@@ -17,8 +17,6 @@
package org.apache.celeborn.common.internal.config
-import java.util.concurrent.ConcurrentHashMap
-
import
org.apache.celeborn.common.internal.config.ConfigHelpers.AlternativesTransfer
import org.apache.celeborn.common.util.JavaUtils
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala
index b40bc2917..6e661ef8a 100644
---
a/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala
@@ -19,7 +19,6 @@ package org.apache.celeborn.common.meta
import java.util
import java.util.concurrent.ConcurrentHashMap
-import java.util.stream.Collectors
import scala.collection.JavaConverters._
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 407bb960a..02f036bcb 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -18,8 +18,6 @@
package org.apache.celeborn.common.meta
import java.util
-import java.util.Objects
-import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
index 9f60c5cd1..0d20cf184 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
@@ -109,7 +109,7 @@ class MetricsSystem(
sourceConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
try {
- val source = Utils.classForName(classPath).newInstance()
+ val source =
Utils.classForName(classPath).getDeclaredConstructor().newInstance()
registerSource(source.asInstanceOf[Source])
} catch {
case e: Exception => logError("Source class " + classPath + " cannot
be instantiated", e)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index a445d74c7..d28986bb0 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -421,7 +421,7 @@ class TimerSupplier(val slidingWindowSize: Int)
class GaugeSupplier[T](f: Unit => T) extends
MetricRegistry.MetricSupplier[Gauge[_]] {
override def newMetric(): Gauge[T] = {
new Gauge[T] {
- override def getValue: T = f()
+ override def getValue: T = f(())
}
}
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/JVMSource.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/JVMSource.scala
index a6e2af925..374024dd2 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/JVMSource.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/JVMSource.scala
@@ -37,6 +37,7 @@ class JVMSource(conf: CelebornConf, role: String) extends
AbstractSource(conf, r
.map { x =>
x.getMetrics.asScala.map {
case (name: String, metric: Gauge[_]) => addGauge(name, metric)
+ case (name, metric) => new IllegalArgumentException(s"Unknown metric
type: $name: $metric")
}
}
// start cleaner
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 438369a42..7e26561ce 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -574,7 +574,7 @@ object ControlMessages extends Logging {
PbFileGroup.newBuilder().addAllLocations(fileGroup.asScala.map(PbSerDeUtils
.toPbPartitionLocation).toList.asJava).build())
}.asJava)
- builder.addAllAttempts(attempts.map(new Integer(_)).toIterable.asJava)
+ builder.addAllAttempts(attempts.map(Integer.valueOf).toIterable.asJava)
builder.addAllPartitionIds(partitionIds)
val payload = builder.build().toByteArray
new TransportMessage(MessageType.GET_REDUCER_FILE_GROUP_RESPONSE,
payload)
@@ -734,7 +734,7 @@ object ControlMessages extends Logging {
.setShuffleId(shuffleId)
.addAllMasterIds(masterIds)
.addAllSlaveIds(slaveIds)
- .addAllMapAttempts(mapAttempts.map(new Integer(_)).toIterable.asJava)
+ .addAllMapAttempts(mapAttempts.map(Integer.valueOf).toIterable.asJava)
.setEpoch(epoch)
.build().toByteArray
new TransportMessage(MessageType.COMMIT_FILES, payload)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/FunctionConverter.scala
b/common/src/main/scala/org/apache/celeborn/common/util/FunctionConverter.scala
index 71c75cf28..8eef121f4 100644
---
a/common/src/main/scala/org/apache/celeborn/common/util/FunctionConverter.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/util/FunctionConverter.scala
@@ -17,6 +17,8 @@
package org.apache.celeborn.common.util
+import scala.language.implicitConversions
+
/**
* Implicit conversion for scala(2.11) function to java function
*/
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
index d4cb4417b..ca5066c34 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
@@ -18,12 +18,12 @@
package org.apache.celeborn.common.util
import java.util.concurrent._
+import java.util.concurrent.{ForkJoinPool => SForkJoinPool,
ForkJoinWorkerThread => SForkJoinWorkerThread}
import scala.collection.TraversableLike
import scala.collection.generic.CanBuildFrom
import scala.concurrent.{Awaitable, ExecutionContext,
ExecutionContextExecutor, Future}
import scala.concurrent.duration.{Duration, FiniteDuration}
-import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool,
ForkJoinWorkerThread => SForkJoinWorkerThread}
import scala.language.higherKinds
import scala.util.control.NonFatal
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index dd835c35b..f5fa1bfe3 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -27,7 +27,7 @@ import java.nio.charset.StandardCharsets
import java.text.SimpleDateFormat
import java.util
import java.util.{Locale, Properties, Random, UUID}
-import java.util.concurrent.{Callable, ConcurrentHashMap, ThreadPoolExecutor,
TimeoutException, TimeUnit}
+import java.util.concurrent.{Callable, ThreadPoolExecutor, TimeoutException,
TimeUnit}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
diff --git
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
index 2a8b41b0c..8ccf1d295 100644
---
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
@@ -19,7 +19,7 @@ package org.apache.celeborn.common.meta
import java.util
import java.util.{Map => jMap}
-import java.util.concurrent.{ConcurrentHashMap, Future, ThreadLocalRandom}
+import java.util.concurrent.{Future, ThreadLocalRandom}
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.ArrayBuffer
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 386038019..563580101 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -26,7 +26,6 @@ import scala.collection.JavaConverters._
import scala.util.Random
import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.exception.CelebornRuntimeException
import org.apache.celeborn.common.haclient.RssHARetryClient
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.internal.Logging
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterArguments.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterArguments.scala
index 7b0d07e70..331e9a8b3 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterArguments.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterArguments.scala
@@ -20,7 +20,6 @@ package org.apache.celeborn.service.deploy.master
import scala.annotation.tailrec
import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.CelebornConf._
import org.apache.celeborn.common.util.{IntParam, Utils}
import
org.apache.celeborn.service.deploy.master.clustermeta.ha.MasterClusterInfo
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
index 0ad315704..a1ac2b67e 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
@@ -20,8 +20,6 @@ package
org.apache.celeborn.service.deploy.master.clustermeta.ha
import java.io.IOException
import java.net.{InetAddress, InetSocketAddress}
-import scala.util.{Failure, Success}
-
import org.apache.ratis.util.NetUtils
import org.apache.celeborn.common.internal.Logging
diff --git a/pom.xml b/pom.xml
index 1ecf80ce8..f7d84964e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -653,6 +653,15 @@
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>${maven.plugin.scala.version}</version>
+ <configuration>
+ <args>
+ <arg>-unchecked</arg>
+ <arg>-deprecation</arg>
+ <arg>-feature</arg>
+ <arg>-explaintypes</arg>
+ <arg>-Xfatal-warnings</arg>
+ </args>
+ </configuration>
<executions>
<execution>
<id>scala-compile-first</id>
diff --git
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
index 463380855..fbf97665b 100644
---
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
+++
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
@@ -60,7 +60,6 @@ class WordCountTest extends AnyFunSuite with Logging with
MiniClusterFeature
"org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory")
configuration.setString("celeborn.master.endpoints", "localhost:9097")
configuration.setString("execution.batch-shuffle-mode",
"ALL_EXCHANGES_BLOCKING")
- configuration.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
configuration.set(ExecutionOptions.RUNTIME_MODE,
RuntimeExecutionMode.BATCH)
configuration.setString("taskmanager.memory.network.min", "1024m")
configuration.setString(RestOptions.BIND_PORT, "8081-8089")
@@ -70,7 +69,6 @@ class WordCountTest extends AnyFunSuite with Logging with
MiniClusterFeature
val env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration)
env.getConfig.setExecutionMode(ExecutionMode.BATCH)
env.getConfig.setParallelism(parallelism)
-
env.getConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ALL)
env.disableOperatorChaining()
// make parameters available in the web interface
WordCountHelper.execute(env, parallelism)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 91ddc16f3..3695ba20d 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -94,13 +94,16 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
client,
pushData.requestId,
pushData.shuffleKey)
- shufflePartitionType.getOrDefault(pushData.shuffleKey,
PartitionType.REDUCE) match {
+ val partitionType =
+ shufflePartitionType.getOrDefault(pushData.shuffleKey,
PartitionType.REDUCE)
+ partitionType match {
case PartitionType.REDUCE => handlePushData(
pushData,
callback)
case PartitionType.MAP => handleMapPartitionPushData(
pushData,
callback)
+ case _ => throw new UnsupportedOperationException(s"Not support
$partitionType yet")
}
})
case pushMergedData: PushMergedData =>
@@ -843,6 +846,7 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
(WorkerSource.MasterRegionStartTime,
WorkerSource.SlaveRegionStartTime)
case Type.REGION_FINISH =>
(WorkerSource.MasterRegionFinishTime,
WorkerSource.SlaveRegionFinishTime)
+ case _ => throw new IllegalArgumentException(s"Not support
$messageType yet")
}
val location =
@@ -891,6 +895,7 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
message.asInstanceOf[RegionStart].isBroadcast)
case Type.REGION_FINISH =>
fileWriter.asInstanceOf[MapPartitionFileWriter].regionFinish()
+ case _ => throw new IllegalArgumentException(s"Not support
$messageType yet")
}
// for master, send data to slave
if (location.hasPeer && isMaster) {
@@ -1004,6 +1009,7 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
case Type.REGION_FINISH => (
StatusCode.REGION_FINISH_FAIL_MASTER,
StatusCode.REGION_FINISH_FAIL_SLAVE)
+ case _ => throw new IllegalArgumentException(s"Not support
$messageType yet")
}
callback.onFailure(new CelebornIOException(
if (isMaster) messageMaster else messageSlave,
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 3c0267fa1..62cf7a3b5 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -311,6 +311,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
splitThreshold,
splitMode,
rangeReadFilter)
+ case _ => throw new UnsupportedOperationException(s"Not support
$partitionType yet")
}
fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName,
fileInfo)
@@ -354,6 +355,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
splitThreshold,
splitMode,
rangeReadFilter)
+ case _ => throw new UnsupportedOperationException(s"Not support
$partitionType yet")
}
deviceMonitor.registerFileWriter(fileWriter)
val map = workingDirWriters.computeIfAbsent(dir,
workingDirWriterListFunc)