This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 2fda21195 [CELEBORN-2055] Fix some typos
2fda21195 is described below
commit 2fda21195985f966dedf779bb4dc8b7f998d1cc8
Author: codenohup <[email protected]>
AuthorDate: Thu Jul 10 12:01:02 2025 +0800
[CELEBORN-2055] Fix some typos
### What changes were proposed in this pull request?
Inspired by
[FLINK-38038](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-38038?filter=allissues]),
I used [Tongyi Lingma](https://lingma.aliyun.com/) and qwen3-thinking LLM to
identify and fix some typo issues in the Celeborn codebase. For example:
- backLog → backlog
- won`t → won't
- can to be read → can be read
- mapDataPartition → mapPartitionData
- UserDefinePasswordAuthenticationProviderImpl →
UserDefinedPasswordAuthenticationProviderImpl
### Why are the changes needed?
Remove typos to improve source code readability for users and ease
development for developers.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Code and documentation cleanup does not require additional testing.
Closes #3356 from codenohup/fix-typo.
Authored-by: codenohup <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
(cherry picked from commit 0fa600ade1ca362ec8bc9156edfc6eac09689a5e)
Signed-off-by: SteNicholas <[email protected]>
---
charts/celeborn/templates/worker/_helpers.tpl | 4 ++--
charts/celeborn/values.yaml | 2 +-
.../org/apache/celeborn/cli/TestCelebornCliCommands.scala | 8 ++++----
.../celeborn/plugin/flink/client/CelebornBufferStream.java | 6 +++---
.../plugin/flink/tiered/CelebornChannelBufferReader.java | 4 +---
.../plugin/flink/tiered/CelebornTierConsumerAgent.java | 4 ++--
.../celeborn/plugin/flink/tiered/CelebornTierFactory.java | 4 ++--
.../plugin/flink/tiered/CelebornChannelBufferReader.java | 4 +---
.../plugin/flink/tiered/CelebornTierConsumerAgent.java | 4 ++--
.../celeborn/plugin/flink/tiered/CelebornTierFactory.java | 4 ++--
.../java/org/apache/spark/shuffle/celeborn/SparkUtils.java | 2 +-
.../java/org/apache/celeborn/client/ShuffleClientImpl.java | 6 +++---
.../scala/org/apache/celeborn/client/LifecycleManager.scala | 4 ++--
.../apache/celeborn/client/RequestLocationCallContext.scala | 2 +-
.../org/apache/celeborn/client/commit/CommitHandler.scala | 2 +-
.../java/org/apache/celeborn/client/ShuffleClientSuiteJ.java | 8 ++++----
.../celeborn/common/network/server/TransportServer.java | 4 ++--
.../apache/celeborn/common/network/util/TransportConf.java | 2 +-
.../apache/celeborn/common/protocol/message/StatusCode.java | 4 ++--
cpp/celeborn/client/reader/WorkerPartitionReader.h | 2 +-
.../org/apache/celeborn/service/deploy/master/Master.scala | 8 ++++----
.../common/http/ApiBaseResourceAuthenticationSuite.scala | 12 ++++++------
...a => UserDefinedPasswordAuthenticationProviderImpl.scala} | 6 +++---
.../celeborn/service/deploy/worker/memory/BufferQueue.java | 2 +-
.../celeborn/service/deploy/worker/memory/MemoryManager.java | 8 ++++----
.../service/deploy/worker/storage/CreditStreamManager.java | 8 ++++----
.../service/deploy/worker/storage/MapPartitionData.java | 2 +-
.../worker/storage/segment/SegmentMapPartitionData.java | 6 +++---
.../storage/segment/SegmentMapPartitionDataReader.java | 2 +-
.../apache/celeborn/service/deploy/worker/Controller.scala | 4 ++--
.../deploy/worker/storage/CreditStreamManagerSuiteJ.java | 4 ++--
31 files changed, 69 insertions(+), 73 deletions(-)
diff --git a/charts/celeborn/templates/worker/_helpers.tpl
b/charts/celeborn/templates/worker/_helpers.tpl
index c135972ef..8d9e86266 100644
--- a/charts/celeborn/templates/worker/_helpers.tpl
+++ b/charts/celeborn/templates/worker/_helpers.tpl
@@ -16,7 +16,7 @@ limitations under the License.
*/}}
{{/*
-Common labels for Celeborn master resources
+Common labels for Celeborn worker resources
*/}}
{{- define "celeborn.worker.labels" -}}
{{ include "celeborn.labels" . }}
@@ -24,7 +24,7 @@ app.kubernetes.io/role: worker
{{- end }}
{{/*
-Selector labels for Celeborn master pods
+Selector labels for Celeborn worker pods
*/}}
{{- define "celeborn.worker.selectorLabels" -}}
{{ include "celeborn.selectorLabels" . }}
diff --git a/charts/celeborn/values.yaml b/charts/celeborn/values.yaml
index 5bf607a88..f77177c4b 100644
--- a/charts/celeborn/values.yaml
+++ b/charts/celeborn/values.yaml
@@ -35,7 +35,7 @@ image:
tag: ""
# -- Image pull policy
pullPolicy: Always
- # -- Image name for init containter. (your-private-repo/alpine:3.18)
+ # -- Image name for init container. (your-private-repo/alpine:3.18)
initContainerImage: alpine:3.18
# -- Image pull secrets for private image registry
diff --git
a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
index aa8a60151..5422239d1 100644
--- a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
+++ b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
@@ -26,7 +26,7 @@ import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.cli.config.CliConfigManager
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.authentication.HttpAuthSchemes
-import
org.apache.celeborn.server.common.http.authentication.{UserDefinePasswordAuthenticationProviderImpl,
UserDefineTokenAuthenticationProviderImpl}
+import
org.apache.celeborn.server.common.http.authentication.{UserDefinedPasswordAuthenticationProviderImpl,
UserDefineTokenAuthenticationProviderImpl}
import org.apache.celeborn.service.deploy.MiniClusterFeature
import org.apache.celeborn.service.deploy.master.Master
import org.apache.celeborn.service.deploy.worker.Worker
@@ -38,11 +38,11 @@ class TestCelebornCliCommands extends CelebornFunSuite with
MiniClusterFeature {
.set(CelebornConf.MASTER_HTTP_AUTH_SUPPORTED_SCHEMES, Seq("BASIC"))
.set(
CelebornConf.MASTER_HTTP_AUTH_BASIC_PROVIDER,
- classOf[UserDefinePasswordAuthenticationProviderImpl].getName)
+ classOf[UserDefinedPasswordAuthenticationProviderImpl].getName)
.set(CelebornConf.WORKER_HTTP_AUTH_SUPPORTED_SCHEMES, Seq("BASIC"))
.set(
CelebornConf.WORKER_HTTP_AUTH_BASIC_PROVIDER,
- classOf[UserDefinePasswordAuthenticationProviderImpl].getName)
+ classOf[UserDefinedPasswordAuthenticationProviderImpl].getName)
.set(CelebornConf.MASTER_HTTP_AUTH_ADMINISTERS, Seq(CELEBORN_ADMINISTER))
.set(CelebornConf.WORKER_HTTP_AUTH_ADMINISTERS, Seq(CELEBORN_ADMINISTER))
.set(CelebornConf.DYNAMIC_CONFIG_STORE_BACKEND, "DB")
@@ -54,7 +54,7 @@ class TestCelebornCliCommands extends CelebornFunSuite with
MiniClusterFeature {
private val BASIC_AUTH_HEADER = HttpAuthSchemes.BASIC + " " + new String(
Base64.getEncoder.encode(
-
s"$CELEBORN_ADMINISTER:${UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD}".getBytes()),
+
s"$CELEBORN_ADMINISTER:${UserDefinedPasswordAuthenticationProviderImpl.VALID_PASSWORD}".getBytes()),
StandardCharsets.UTF_8)
protected var master: Master = _
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/CelebornBufferStream.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/CelebornBufferStream.java
index b63757d2d..a0a498ff4 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/CelebornBufferStream.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/CelebornBufferStream.java
@@ -193,7 +193,7 @@ public class CelebornBufferStream {
}
}
- private void cleanStream(long streamId) {
+ private void cleanupStream(long streamId) {
if (isOpenSuccess) {
mapShuffleClient.getReadClientHandler().removeHandler(streamId);
clientFactory.unregisterSupplier(streamId);
@@ -204,7 +204,7 @@ public class CelebornBufferStream {
public void close() {
synchronized (lock) {
- cleanStream(streamId);
+ cleanupStream(streamId);
isClosed = true;
}
}
@@ -222,7 +222,7 @@ public class CelebornBufferStream {
locations.length);
if (currentLocationIndex.get() > 0) {
logger.debug("Get end streamId {}", endedStreamId);
- cleanStream(endedStreamId);
+ cleanupStream(endedStreamId);
}
if (currentLocationIndex.get() < locations.length) {
diff --git
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferReader.java
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferReader.java
index e245efab9..883d90c89 100644
---
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferReader.java
+++
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferReader.java
@@ -46,9 +46,7 @@ import
org.apache.celeborn.plugin.flink.client.CelebornBufferStream;
import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl;
import org.apache.celeborn.plugin.flink.protocol.SubPartitionReadData;
-/**
- * Wrap the {@link CelebornBufferStream}, utilize in flink hybrid shuffle
integration strategy now.
- */
+/** Wrap the {@link CelebornBufferStream}, used in flink hybrid shuffle
integration strategy now. */
public class CelebornChannelBufferReader {
private static final Logger LOG =
LoggerFactory.getLogger(CelebornChannelBufferReader.class);
diff --git
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java
index 0c5c454ee..7bfd9d163 100644
---
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java
+++
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java
@@ -79,7 +79,7 @@ public class CelebornTierConsumerAgent implements
TierConsumerAgent {
/**
* partitionId -> subPartitionId -> reader, note that subPartitions may
share the same reader, as
- * a single reader can consume multiple subPartitions to improvement
performance.
+ * a single reader can consume multiple subPartitions to improve performance.
*/
private final Map<
TieredStoragePartitionId, Map<TieredStorageSubpartitionId,
CelebornChannelBufferReader>>
@@ -111,7 +111,7 @@ public class CelebornTierConsumerAgent implements
TierConsumerAgent {
/**
* The notify target is flink inputGate, used in notify input gate which
subPartition contain
- * shuffle data that can to be read.
+ * shuffle data that can be read.
*/
private AvailabilityNotifier availabilityNotifier;
diff --git
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java
index c9913d132..3c821363e 100644
---
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java
+++
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java
@@ -56,7 +56,7 @@ public class CelebornTierFactory implements TierFactory {
* The max bytes size of a single segment, it will determine how many buffer
can save in a single
* segment.
*/
- private static int NUM_BYTES_PER_SEGMENT = 8 * 1024 * 1024;
+ private static int MAX_BYTES_PER_SEGMENT = 8 * 1024 * 1024;
private static final String CELEBORN_TIER_NAME =
CelebornTierFactory.class.getSimpleName();
@@ -106,7 +106,7 @@ public class CelebornTierFactory implements TierFactory {
partitionId,
numPartitions,
numSubpartitions,
- NUM_BYTES_PER_SEGMENT,
+ MAX_BYTES_PER_SEGMENT,
bufferSizeBytes,
storageMemoryManager,
resourceRegistry,
diff --git
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferReader.java
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferReader.java
index e245efab9..883d90c89 100644
---
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferReader.java
+++
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferReader.java
@@ -46,9 +46,7 @@ import
org.apache.celeborn.plugin.flink.client.CelebornBufferStream;
import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl;
import org.apache.celeborn.plugin.flink.protocol.SubPartitionReadData;
-/**
- * Wrap the {@link CelebornBufferStream}, utilize in flink hybrid shuffle
integration strategy now.
- */
+/** Wrap the {@link CelebornBufferStream}, used in flink hybrid shuffle
integration strategy now. */
public class CelebornChannelBufferReader {
private static final Logger LOG =
LoggerFactory.getLogger(CelebornChannelBufferReader.class);
diff --git
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java
index 0c5c454ee..7bfd9d163 100644
---
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java
+++
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java
@@ -79,7 +79,7 @@ public class CelebornTierConsumerAgent implements
TierConsumerAgent {
/**
* partitionId -> subPartitionId -> reader, note that subPartitions may
share the same reader, as
- * a single reader can consume multiple subPartitions to improvement
performance.
+ * a single reader can consume multiple subPartitions to improve performance.
*/
private final Map<
TieredStoragePartitionId, Map<TieredStorageSubpartitionId,
CelebornChannelBufferReader>>
@@ -111,7 +111,7 @@ public class CelebornTierConsumerAgent implements
TierConsumerAgent {
/**
* The notify target is flink inputGate, used in notify input gate which
subPartition contain
- * shuffle data that can to be read.
+ * shuffle data that can be read.
*/
private AvailabilityNotifier availabilityNotifier;
diff --git
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java
index aeaa81831..fbb17f4f7 100644
---
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java
+++
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java
@@ -59,7 +59,7 @@ public class CelebornTierFactory implements TierFactory {
* The max bytes size of a single segment, it will determine how many buffer
can save in a single
* segment.
*/
- private static int NUM_BYTES_PER_SEGMENT = 8 * 1024 * 1024;
+ private static int MAX_BYTES_PER_SEGMENT = 8 * 1024 * 1024;
private static final String CELEBORN_TIER_NAME =
CelebornTierFactory.class.getSimpleName();
@@ -110,7 +110,7 @@ public class CelebornTierFactory implements TierFactory {
partitionId,
numPartitions,
numSubpartitions,
- NUM_BYTES_PER_SEGMENT,
+ MAX_BYTES_PER_SEGMENT,
bufferSizeBytes,
storageMemoryManager,
resourceRegistry,
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index a3a1add71..bd57cd329 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -118,7 +118,7 @@ public class SparkUtils {
field.setAccessible(true);
return (SQLMetric) field.get(serializer);
} catch (NoSuchFieldException | IllegalAccessException e) {
- logger.warn("Failed to get dataSize metric, aqe won`t work properly.");
+ logger.warn("Failed to get dataSize metric, aqe won't work properly.");
}
return null;
}
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index d9a1e67d6..828eb3158 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -903,7 +903,7 @@ public class ShuffleClientImpl extends ShuffleClient {
} else if (StatusCode.STAGE_ENDED.getValue() == statusCode) {
stageEndShuffleSet.add(shuffleId);
return results;
- } else if (StatusCode.SHUFFLE_NOT_REGISTERED.getValue() == statusCode)
{
+ } else if (StatusCode.SHUFFLE_UNREGISTERED.getValue() == statusCode) {
logger.error("SHUFFLE_NOT_REGISTERED!");
return null;
}
@@ -1832,7 +1832,7 @@ public class ShuffleClientImpl extends ShuffleClient {
response.pushFailedBatches()),
null,
null);
- case SHUFFLE_NOT_REGISTERED:
+ case SHUFFLE_UNREGISTERED:
logger.warn(
"Request {} return {} for {}.", getReducerFileGroup,
response.status(), shuffleId);
// return empty result
@@ -1844,7 +1844,7 @@ public class ShuffleClientImpl extends ShuffleClient {
response.pushFailedBatches()),
null,
null);
- case STAGE_END_TIME_OUT:
+ case STAGE_END_TIMEOUT:
case SHUFFLE_DATA_LOST:
exceptionMsg =
String.format(
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 9063ce7b8..b339c75e6 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -797,7 +797,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
logError(s"[handleRevive] shuffle $shuffleId not registered!")
contextWrapper.reply(
-1,
- StatusCode.SHUFFLE_NOT_REGISTERED,
+ StatusCode.SHUFFLE_UNREGISTERED,
None,
false)
return
@@ -871,7 +871,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
if (!registeredShuffle.contains(shuffleId) &&
!isSegmentGranularityVisible) {
logWarning(s"[handleGetReducerFileGroup] shuffle $shuffleId not
registered, maybe no shuffle data within this stage.")
context.reply(GetReducerFileGroupResponse(
- StatusCode.SHUFFLE_NOT_REGISTERED,
+ StatusCode.SHUFFLE_UNREGISTERED,
JavaUtils.newConcurrentHashMap(),
Array.empty,
serdeVersion = serdeVersion))
diff --git
a/client/src/main/scala/org/apache/celeborn/client/RequestLocationCallContext.scala
b/client/src/main/scala/org/apache/celeborn/client/RequestLocationCallContext.scala
index d306546b3..091960a4c 100644
---
a/client/src/main/scala/org/apache/celeborn/client/RequestLocationCallContext.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/RequestLocationCallContext.scala
@@ -57,7 +57,7 @@ case class ChangeLocationsCallContext(
}
newLocs.put(partitionId, (status, available,
partitionLocationOpt.getOrElse(null)))
- if (newLocs.size() == partitionCount || StatusCode.SHUFFLE_NOT_REGISTERED
== status
+ if (newLocs.size() == partitionCount || StatusCode.SHUFFLE_UNREGISTERED ==
status
|| StatusCode.STAGE_ENDED == status) {
context.reply(ChangeLocationResponse(endedMapIds, newLocs))
}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index 4a8b542f8..77034d2bd 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -335,7 +335,7 @@ abstract class CommitHandler(
status.future.value.get match {
case scala.util.Success(res) =>
res.status match {
- case StatusCode.SUCCESS | StatusCode.PARTIAL_SUCCESS |
StatusCode.SHUFFLE_NOT_REGISTERED | StatusCode.REQUEST_FAILED |
StatusCode.WORKER_EXCLUDED | StatusCode.COMMIT_FILE_EXCEPTION =>
+ case StatusCode.SUCCESS | StatusCode.PARTIAL_SUCCESS |
StatusCode.SHUFFLE_UNREGISTERED | StatusCode.REQUEST_FAILED |
StatusCode.WORKER_EXCLUDED | StatusCode.COMMIT_FILE_EXCEPTION =>
if (res.status == StatusCode.SUCCESS) {
logDebug(s"Request commitFiles return ${res.status} for " +
s"$shuffleKey from worker ${worker.readableAddress()}")
diff --git
a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java
b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java
index 85cf0ba10..d647e60b9 100644
--- a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java
+++ b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java
@@ -483,7 +483,7 @@ public class ShuffleClientSuiteJ {
.thenAnswer(
t -> {
return GetReducerFileGroupResponse$.MODULE$.apply(
- StatusCode.SHUFFLE_NOT_REGISTERED,
+ StatusCode.SHUFFLE_UNREGISTERED,
locations,
new int[0],
Collections.emptySet(),
@@ -496,7 +496,7 @@ public class ShuffleClientSuiteJ {
.thenAnswer(
t -> {
return GetReducerFileGroupResponse$.MODULE$.apply(
- StatusCode.SHUFFLE_NOT_REGISTERED,
+ StatusCode.SHUFFLE_UNREGISTERED,
locations,
new int[0],
Collections.emptySet(),
@@ -519,7 +519,7 @@ public class ShuffleClientSuiteJ {
.thenAnswer(
t -> {
return GetReducerFileGroupResponse$.MODULE$.apply(
- StatusCode.STAGE_END_TIME_OUT,
+ StatusCode.STAGE_END_TIMEOUT,
locations,
new int[0],
Collections.emptySet(),
@@ -532,7 +532,7 @@ public class ShuffleClientSuiteJ {
.thenAnswer(
t -> {
return GetReducerFileGroupResponse$.MODULE$.apply(
- StatusCode.STAGE_END_TIME_OUT,
+ StatusCode.STAGE_END_TIMEOUT,
locations,
new int[0],
Collections.emptySet(),
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
index 1808a000c..a8e7e752b 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
@@ -104,8 +104,8 @@ public class TransportServer implements Closeable {
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.ALLOCATOR, allocator);
- if (conf.backLog() > 0) {
- bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
+ if (conf.backlog() > 0) {
+ bootstrap.option(ChannelOption.SO_BACKLOG, conf.backlog());
}
if (conf.receiveBuf() > 0) {
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java
b/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java
index 973064451..55b22306d 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java
@@ -70,7 +70,7 @@ public class TransportConf {
}
/** Requested maximum length of the queue of incoming connections. Default 0
for no backlog. */
- public int backLog() {
+ public int backlog() {
return celebornConf.networkIoBacklog(module);
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
index 46c59ab52..8987e8d83 100644
---
a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
+++
b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
@@ -31,7 +31,7 @@ public enum StatusCode {
// Specific Status
SHUFFLE_ALREADY_REGISTERED(3),
- SHUFFLE_NOT_REGISTERED(4),
+ SHUFFLE_UNREGISTERED(4),
RESERVE_SLOTS_FAILED(5),
SLOT_NOT_AVAILABLE(6),
WORKER_NOT_FOUND(7),
@@ -54,7 +54,7 @@ public enum StatusCode {
HARD_SPLIT(21),
SOFT_SPLIT(22),
- STAGE_END_TIME_OUT(23),
+ STAGE_END_TIMEOUT(23),
SHUFFLE_DATA_LOST(24),
WORKER_SHUTDOWN(25),
NO_AVAILABLE_WORKING_DIR(26),
diff --git a/cpp/celeborn/client/reader/WorkerPartitionReader.h
b/cpp/celeborn/client/reader/WorkerPartitionReader.h
index 68fb00230..db22da8c2 100644
--- a/cpp/celeborn/client/reader/WorkerPartitionReader.h
+++ b/cpp/celeborn/client/reader/WorkerPartitionReader.h
@@ -87,7 +87,7 @@ class WorkerPartitionReader
static constexpr auto kDefaultConsumeIter = std::chrono::milliseconds(500);
- // TODO: add other params, such as fetchChunkRetryCnt, fetchChunkMaxRetry
+ // TODO: add other params, such as fetchChunkRetryCnt, fetchChunkMaxRetries
};
} // namespace client
} // namespace celeborn
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index a91996986..52d52248a 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -162,7 +162,7 @@ private[celeborn] class Master(
logError(msg, ioe)
System.exit(1)
} else {
- logError("Face unexpected IO exception during staring Ratis
server", ioe)
+ logError("Face unexpected IO exception during starting Ratis
server", ioe)
}
}
sys
@@ -174,7 +174,7 @@ private[celeborn] class Master(
// Threads
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-message-forwarder")
- private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _
+ private var checkForWorkerTimeoutTask: ScheduledFuture[_] = _
private var checkForApplicationTimeOutTask: ScheduledFuture[_] = _
private var checkForUnavailableWorkerTimeOutTask: ScheduledFuture[_] = _
private var checkForDFSRemnantDirsTimeOutTask: ScheduledFuture[_] = _
@@ -336,7 +336,7 @@ private[celeborn] class Master(
"send-application-meta")
}
- checkForWorkerTimeOutTask = scheduleCheckTask(workerHeartbeatTimeoutMs,
pbCheckForWorkerTimeout)
+ checkForWorkerTimeoutTask = scheduleCheckTask(workerHeartbeatTimeoutMs,
pbCheckForWorkerTimeout)
checkForApplicationTimeOutTask =
scheduleCheckTask(appHeartbeatTimeoutMs / 2, CheckForApplicationTimeOut)
@@ -370,7 +370,7 @@ private[celeborn] class Master(
return
}
logInfo("Stopping Celeborn Master.")
- Option(checkForWorkerTimeOutTask).foreach(_.cancel(true))
+ Option(checkForWorkerTimeoutTask).foreach(_.cancel(true))
Option(checkForUnavailableWorkerTimeOutTask).foreach(_.cancel(true))
Option(checkForApplicationTimeOutTask).foreach(_.cancel(true))
Option(checkForDFSRemnantDirsTimeOutTask).foreach(_.cancel(true))
diff --git
a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala
index 34978b619..7e62a331d 100644
---
a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala
+++
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala
@@ -26,7 +26,7 @@ import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.authentication.HttpAuthSchemes
import org.apache.celeborn.common.network.TestHelper
import
org.apache.celeborn.server.common.http.HttpAuthUtils.AUTHORIZATION_HEADER
-import
org.apache.celeborn.server.common.http.authentication.{UserDefinePasswordAuthenticationProviderImpl,
UserDefineTokenAuthenticationProviderImpl}
+import
org.apache.celeborn.server.common.http.authentication.{UserDefinedPasswordAuthenticationProviderImpl,
UserDefineTokenAuthenticationProviderImpl}
abstract class ApiBaseResourceAuthenticationSuite extends HttpTestHelper {
val administers = Seq("celeborn", "celeborn2")
@@ -38,14 +38,14 @@ abstract class ApiBaseResourceAuthenticationSuite extends
HttpTestHelper {
.set(CelebornConf.MASTER_HTTP_AUTH_SUPPORTED_SCHEMES, Seq("BASIC",
"BEARER"))
.set(
CelebornConf.MASTER_HTTP_AUTH_BASIC_PROVIDER,
- classOf[UserDefinePasswordAuthenticationProviderImpl].getName)
+ classOf[UserDefinedPasswordAuthenticationProviderImpl].getName)
.set(
CelebornConf.MASTER_HTTP_AUTH_BEARER_PROVIDER,
classOf[UserDefineTokenAuthenticationProviderImpl].getName)
.set(CelebornConf.WORKER_HTTP_AUTH_SUPPORTED_SCHEMES, Seq("BASIC",
"BEARER"))
.set(
CelebornConf.WORKER_HTTP_AUTH_BASIC_PROVIDER,
- classOf[UserDefinePasswordAuthenticationProviderImpl].getName)
+ classOf[UserDefinedPasswordAuthenticationProviderImpl].getName)
.set(
CelebornConf.WORKER_HTTP_AUTH_BEARER_PROVIDER,
classOf[UserDefineTokenAuthenticationProviderImpl].getName)
@@ -67,7 +67,7 @@ abstract class ApiBaseResourceAuthenticationSuite extends
HttpTestHelper {
AUTHORIZATION_HEADER,
basicAuthorizationHeader(
"user",
- UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD))
+ UserDefinedPasswordAuthenticationProviderImpl.VALID_PASSWORD))
.get()
assert(HttpServletResponse.SC_OK == response.getStatus)
@@ -126,7 +126,7 @@ abstract class ApiBaseResourceAuthenticationSuite extends
HttpTestHelper {
AUTHORIZATION_HEADER,
basicAuthorizationHeader(
"no_admin",
- UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD))
+ UserDefinedPasswordAuthenticationProviderImpl.VALID_PASSWORD))
.post(null)
assert(HttpServletResponse.SC_FORBIDDEN == response.getStatus)
@@ -137,7 +137,7 @@ abstract class ApiBaseResourceAuthenticationSuite extends
HttpTestHelper {
AUTHORIZATION_HEADER,
basicAuthorizationHeader(
admin,
- UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD))
+ UserDefinedPasswordAuthenticationProviderImpl.VALID_PASSWORD))
.post(null)
// pass the admin privilege check, but the api is not found
assert(HttpServletResponse.SC_NOT_FOUND == response.getStatus)
diff --git
a/service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefinePasswordAuthenticationProviderImpl.scala
b/service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefinedPasswordAuthenticationProviderImpl.scala
similarity index 91%
rename from
service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefinePasswordAuthenticationProviderImpl.scala
rename to
service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefinedPasswordAuthenticationProviderImpl.scala
index 38db5e49a..0c4c733b5 100644
---
a/service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefinePasswordAuthenticationProviderImpl.scala
+++
b/service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefinedPasswordAuthenticationProviderImpl.scala
@@ -21,10 +21,10 @@ import java.security.Principal
import javax.security.sasl.AuthenticationException
import org.apache.celeborn.common.internal.Logging
-import
org.apache.celeborn.server.common.http.authentication.UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD
+import
org.apache.celeborn.server.common.http.authentication.UserDefinedPasswordAuthenticationProviderImpl.VALID_PASSWORD
import org.apache.celeborn.spi.authentication.{BasicPrincipal, Credential,
PasswdAuthenticationProvider, PasswordCredential}
-class UserDefinePasswordAuthenticationProviderImpl
+class UserDefinedPasswordAuthenticationProviderImpl
extends PasswdAuthenticationProvider with Logging {
override def authenticate(credential: PasswordCredential): Principal = {
val clientIp = credential.extraInfo.get(Credential.CLIENT_IP_KEY)
@@ -37,6 +37,6 @@ class UserDefinePasswordAuthenticationProviderImpl
}
}
-object UserDefinePasswordAuthenticationProviderImpl {
+object UserDefinedPasswordAuthenticationProviderImpl {
val VALID_PASSWORD = "password"
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/BufferQueue.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/BufferQueue.java
index 10de48a32..e7b49506a 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/BufferQueue.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/BufferQueue.java
@@ -30,7 +30,7 @@ import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-// Assume that max-managed memory for a MapDataPartition is (2^31 * buffersize)
+// Assume that max-managed memory for a MapPartitionData is (2^31 * buffersize)
public class BufferQueue {
public static final Logger logger =
LoggerFactory.getLogger(BufferQueue.class);
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index ff08c077c..6f4922db0 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -209,10 +209,10 @@ public class MemoryManager {
() -> {
try {
if (creditStreamManager != null) {
- int mapDataPartitionCount =
creditStreamManager.getActiveMapPartitionCount();
- if (mapDataPartitionCount > 0) {
+ int mapPartitionDataCount =
creditStreamManager.getActiveMapPartitionCount();
+ if (mapPartitionDataCount > 0) {
long currentTarget =
- (long) Math.ceil(readBufferTarget * 1.0 /
mapDataPartitionCount);
+ (long) Math.ceil(readBufferTarget * 1.0 /
mapPartitionDataCount);
if (Math.abs(lastNotifiedTarget - currentTarget)
> readBufferTargetNotifyThreshold) {
synchronized (readBufferTargetChangeListeners) {
@@ -220,7 +220,7 @@ public class MemoryManager {
"read buffer target changed {} -> {} active map
partition count {}",
lastNotifiedTarget,
currentTarget,
- mapDataPartitionCount);
+ mapPartitionDataCount);
for (ReadBufferTargetChangeListener changeListener :
readBufferTargetChangeListeners) {
changeListener.onChange(currentTarget);
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
index 0a3736a38..e8f408afe 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
@@ -200,7 +200,7 @@ public class CreditStreamManager {
logger.warn("Ignore AddCredit from stream {}, numCredit {}.", streamId,
numCredit);
return;
}
- MapPartitionData mapPartitionData =
streams.get(streamId).getMapDataPartition();
+ MapPartitionData mapPartitionData =
streams.get(streamId).getMapPartitionData();
addCredit(mapPartitionData, numCredit, streamId);
}
@@ -208,7 +208,7 @@ public class CreditStreamManager {
StreamState streamState = streams.get(streamId);
if (streamState != null) {
notifyRequiredSegment(
- streamState.getMapDataPartition(), requiredSegmentId, streamId,
subPartitionId);
+ streamState.getMapPartitionData(), requiredSegmentId, streamId,
subPartitionId);
} else {
// In flink hybrid shuffle integration strategy, the stream may release
in worker before
// client receive bufferStreamEnd,
@@ -279,7 +279,7 @@ public class CreditStreamManager {
public void cleanResource(Long streamId) {
logger.debug("received clean stream: {}", streamId);
if (streams.containsKey(streamId)) {
- MapPartitionData mapPartitionData =
streams.get(streamId).getMapDataPartition();
+ MapPartitionData mapPartitionData =
streams.get(streamId).getMapPartitionData();
if (mapPartitionData != null) {
if (mapPartitionData.releaseReader(streamId)) {
streams.remove(streamId);
@@ -340,7 +340,7 @@ public class CreditStreamManager {
return bufferSize;
}
- public MapPartitionData getMapDataPartition() {
+ public MapPartitionData getMapPartitionData() {
return mapPartitionData;
}
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
index 46577e049..d88370edf 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
@@ -260,7 +260,7 @@ public class MapPartitionData implements
MemoryManager.ReadBufferTargetChangeLis
@Override
public String toString() {
- return "MapDataPartition{" + "fileInfo=" + diskFileInfo.getFilePath() +
'}';
+ return "MapPartitionData{" + "fileInfo=" + diskFileInfo.getFilePath() +
'}';
}
public ConcurrentHashMap<Long, MapPartitionDataReader> getReaders() {
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java
index aa3090a3d..e8d467d8f 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java
@@ -57,7 +57,7 @@ public class SegmentMapPartitionData extends MapPartitionData
{
@Override
public void setupDataPartitionReader(
int startSubIndex, int endSubIndex, long streamId, Channel channel) {
- SegmentMapPartitionDataReader mapDataPartitionReader =
+ SegmentMapPartitionDataReader mapPartitionDataReader =
new SegmentMapPartitionDataReader(
startSubIndex,
endSubIndex,
@@ -70,7 +70,7 @@ public class SegmentMapPartitionData extends MapPartitionData
{
startSubIndex,
endSubIndex,
streamId);
- readers.put(streamId, mapDataPartitionReader);
+ readers.put(streamId, mapPartitionDataReader);
}
@Override
@@ -85,7 +85,7 @@ public class SegmentMapPartitionData extends MapPartitionData
{
@Override
public String toString() {
- return String.format("SegmentMapDataPartition{filePath=%s}",
diskFileInfo.getFilePath());
+ return String.format("SegmentMapPartitionData{filePath=%s}",
diskFileInfo.getFilePath());
}
public void notifyRequiredSegmentId(int segmentId, long streamId, int
subPartitionId) {
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionDataReader.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionDataReader.java
index e65476739..522036b6a 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionDataReader.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionDataReader.java
@@ -211,7 +211,7 @@ public class SegmentMapPartitionDataReader extends
MapPartitionDataReader {
@Override
public String toString() {
- final StringBuilder sb = new
StringBuilder("SegmentMapDataPartitionReader{");
+ final StringBuilder sb = new
StringBuilder("SegmentMapPartitionDataReader{");
sb.append("startPartitionIndex=").append(startPartitionIndex);
sb.append(", endPartitionIndex=").append(endPartitionIndex);
sb.append(", streamId=").append(streamId);
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index 64624e9ad..f01792729 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -426,7 +426,7 @@ private[deploy] class Controller(
logError(s"Shuffle $shuffleKey doesn't exist!")
context.reply(
CommitFilesResponse(
- StatusCode.SHUFFLE_NOT_REGISTERED,
+ StatusCode.SHUFFLE_UNREGISTERED,
List.empty.asJava,
List.empty.asJava,
primaryIds,
@@ -681,7 +681,7 @@ private[deploy] class Controller(
logWarning(s"Shuffle $shuffleKey not registered!")
context.reply(
DestroyWorkerSlotsResponse(
- StatusCode.SHUFFLE_NOT_REGISTERED,
+ StatusCode.SHUFFLE_UNREGISTERED,
primaryLocations,
replicaLocations))
return
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java
index d886cc864..15e28c856 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java
@@ -102,9 +102,9 @@ public class CreditStreamManagerSuiteJ {
streamIdConsumer, channel, shuffleKey, 0, 1, 1, diskFileInfo);
MapPartitionData mapPartitionData1 =
-
creditStreamManager.getStreams().get(registerStream1).getMapDataPartition();
+
creditStreamManager.getStreams().get(registerStream1).getMapPartitionData();
MapPartitionData mapPartitionData2 =
-
creditStreamManager.getStreams().get(registerStream2).getMapDataPartition();
+
creditStreamManager.getStreams().get(registerStream2).getMapPartitionData();
Assert.assertEquals(mapPartitionData1, mapPartitionData2);
mapPartitionData1.getStreamReader(registerStream1).recycle();