This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new ddf83843 [Improvement] Small refactor for code quality (#394)
ddf83843 is described below

commit ddf83843a9fbce8155d159815c351b16eb5a9704
Author: advancedxy <[email protected]>
AuthorDate: Mon Dec 12 00:01:05 2022 +0800

    [Improvement] Small refactor for code quality (#394)
    
    ### What changes were proposed in this pull request?
    tweaks some wording, fixes some typos and removes some dead code
    
    ### Why are the changes needed?
    Better code quality
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing UTs.
---
 .../spark/shuffle/reader/RssShuffleDataIterator.java |  6 ------
 .../apache/spark/shuffle/writer/WriterBuffer.java    |  2 +-
 .../spark/shuffle/DelegationRssShuffleManager.java   |  3 +--
 .../spark/shuffle/writer/RssShuffleWriter.java       |  9 ++++++---
 .../uniffle/client/factory/ShuffleClientFactory.java |  4 ++--
 .../uniffle/client/impl/ShuffleReadClientImpl.java   |  4 +---
 .../uniffle/client/impl/ShuffleWriteClientImpl.java  |  4 ++--
 .../org/apache/uniffle/client/util/ClientUtils.java  |  4 ++--
 .../common/filesystem/HadoopFilesystemProvider.java  |  2 +-
 .../apache/uniffle/common/metrics/GRPCMetrics.java   | 20 ++++++++++----------
 .../common/rpc/MonitoringServerTransportFilter.java  |  6 +++---
 .../apache/uniffle/coordinator/CoordinatorConf.java  |  2 +-
 .../uniffle/test/CoordinatorGrpcServerTest.java      |  8 ++++----
 .../org/apache/uniffle/test/CoordinatorGrpcTest.java |  4 ++--
 .../client/factory/CoordinatorClientFactory.java     |  2 +-
 .../java/org/apache/uniffle/server/HealthCheck.java  |  4 ++--
 .../org/apache/uniffle/server/ShuffleServer.java     |  6 +++---
 .../apache/uniffle/server/ShuffleTaskManager.java    |  2 +-
 .../apache/uniffle/server/buffer/ShuffleBuffer.java  |  2 +-
 .../uniffle/server/buffer/ShuffleBufferManager.java  |  6 +++---
 .../uniffle/server/storage/HdfsStorageManager.java   |  2 +-
 .../uniffle/server/storage/LocalStorageManager.java  |  2 +-
 .../uniffle/server/storage/MultiStorageManager.java  |  4 ++--
 .../uniffle/server/storage/StorageManager.java       |  2 +-
 .../uniffle/server/ShuffleFlushManagerTest.java      |  2 +-
 .../uniffle/server/ShuffleTaskManagerTest.java       |  2 +-
 .../server/storage/LocalStorageManagerTest.java      |  2 +-
 .../uniffle/storage/common/LocalStorageMeta.java     |  4 ++--
 .../uniffle/storage/util/ShuffleStorageUtils.java    | 10 +++++-----
 29 files changed, 62 insertions(+), 68 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
index 7dec34e8..dd01d517 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
@@ -20,7 +20,6 @@ package org.apache.spark.shuffle.reader;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import com.esotericsoftware.kryo.io.Input;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBufInputStream;
 import io.netty.buffer.Unpooled;
@@ -54,7 +53,6 @@ public class RssShuffleDataIterator<K, C> extends 
AbstractIterator<Product2<K, C
   private long readTime = 0;
   private long serializeTime = 0;
   private long decompressTime = 0;
-  private Input deserializationInput = null;
   private DeserializationStream deserializationStream = null;
   private ByteBufInputStream byteBufInputStream = null;
   private long compressedBytesLength = 0;
@@ -95,13 +93,9 @@ public class RssShuffleDataIterator<K, C> extends 
AbstractIterator<Product2<K, C
         LOG.warn("Can't close ByteBufInputStream, memory may be leaked.");
       }
     }
-    if (deserializationInput != null) {
-      deserializationInput.close();
-    }
     if (deserializationStream != null) {
       deserializationStream.close();
     }
-    deserializationInput = null;
     deserializationStream = null;
     byteBufInputStream = null;
   }
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriterBuffer.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriterBuffer.java
index a1427c71..22bc90f9 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriterBuffer.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriterBuffer.java
@@ -53,7 +53,7 @@ public class WriterBuffer {
     try {
       System.arraycopy(recordBuffer, 0, buffer, nextOffset, length);
     } catch (Exception e) {
-      LOG.error("Unexpect exception for System.arraycopy, length[" + length + 
"], nextOffset["
+      LOG.error("Unexpected exception for System.arraycopy, length[" + length 
+ "], nextOffset["
           + nextOffset + "], bufferSize[" + bufferSize + "]");
       throw e;
     }
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 79016a6d..592c28a9 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -107,8 +107,7 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
   }
 
   private boolean tryAccessCluster() {
-    String accessId = sparkConf.get(
-        RssSparkConfig.RSS_ACCESS_ID.key(), "").trim();
+    String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), 
"").trim();
     if (StringUtils.isEmpty(accessId)) {
       LOG.warn("Access id key is empty");
       return false;
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 5ea1a54f..4a6f4428 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -168,6 +168,11 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   private void writeImpl(Iterator<Product2<K,V>> records) {
     List<ShuffleBlockInfo> shuffleBlockInfos = null;
     Set<Long> blockIds = Sets.newConcurrentHashSet();
+    boolean isCombine = shuffleDependency.mapSideCombine();
+    Function1 createCombiner = null;
+    if (isCombine) {
+      createCombiner = shuffleDependency.aggregator().get().createCombiner();
+    }
     while (records.hasNext()) {
       // Task should fast fail when sending data failed
       checkIfBlocksFailed();
@@ -175,9 +180,7 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       Product2<K, V> record = records.next();
       K key = record._1();
       int partition = getPartition(key);
-      boolean isCombine = shuffleDependency.mapSideCombine();
       if (isCombine) {
-        Function1 createCombiner = 
shuffleDependency.aggregator().get().createCombiner();
         Object c = createCombiner.apply(record._2());
         shuffleBlockInfos = bufferManager.addRecord(partition, record._1(), c);
       } else {
@@ -216,7 +219,7 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     if (shuffleBlockInfoList != null && !shuffleBlockInfoList.isEmpty()) {
       shuffleBlockInfoList.forEach(sbi -> {
         long blockId = sbi.getBlockId();
-        // add blockId to set, check if it is send later
+        // add blockId to set, check if it is sent later
         blockIds.add(blockId);
         // update [partition, blockIds], it will be sent to shuffle server
         int partitionId = sbi.getPartitionId();
diff --git 
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
 
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
index 547e6af0..caaa14c5 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
@@ -35,7 +35,7 @@ public class ShuffleClientFactory {
   }
 
   /**
-   * Only for MR engine, which wont used to unregister to remote 
shuffle-servers
+   * Only for MR engine, which won't used to unregister to remote 
shuffle-servers
    */
   public ShuffleWriteClient createShuffleWriteClient(
       String clientType, int retryMax, long retryIntervalMax, int 
heartBeatThreadNum,
@@ -49,7 +49,7 @@ public class ShuffleClientFactory {
       String clientType, int retryMax, long retryIntervalMax, int 
heartBeatThreadNum,
       int replica, int replicaWrite, int replicaRead, boolean 
replicaSkipEnabled, int dataTransferPoolSize,
       int dataCommitPoolSize, int unregisterThreadPoolSize, int 
unregisterRequestTimeoutSec) {
-    // If replica > replicaWrite, blocks maybe will be sended for 2 rounds.
+    // If replica > replicaWrite, blocks maybe be sent for 2 rounds.
     // We need retry less times in this case for let the first round fail fast.
     if (replicaSkipEnabled && replica > replicaWrite) {
       retryMax = retryMax / 2;
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index eacefe78..333fe527 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -159,7 +159,7 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
       return null;
     }
 
-    // if need request new data from shuffle server
+    // if client need request new data from shuffle server
     if (bufferSegmentQueue.isEmpty()) {
       if (read() <= 0) {
         return null;
@@ -186,8 +186,6 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
         long actualCrc = -1;
         try {
           long start = System.currentTimeMillis();
-          copyTime.addAndGet(System.currentTimeMillis() - start);
-          start = System.currentTimeMillis();
           expectedCrc = bs.getCrc();
           actualCrc = ChecksumUtils.getCrc32(readBuffer, bs.getOffset(), 
bs.getLength());
           crcCheckTime.addAndGet(System.currentTimeMillis() - start);
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 1d76a41a..60a775f0 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -115,7 +115,7 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
       int replicaWrite,
       int replicaRead,
       boolean replicaSkipEnabled,
-      int dataTranferPoolSize,
+      int dataTransferPoolSize,
       int dataCommitPoolSize,
       int unregisterThreadPoolSize,
       int unregisterRequestTimeSec) {
@@ -129,7 +129,7 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
     this.replicaWrite = replicaWrite;
     this.replicaRead = replicaRead;
     this.replicaSkipEnabled = replicaSkipEnabled;
-    this.dataTransferPool = Executors.newFixedThreadPool(dataTranferPoolSize);
+    this.dataTransferPool = Executors.newFixedThreadPool(dataTransferPoolSize);
     this.dataCommitPoolSize = dataCommitPoolSize;
     this.unregisterThreadPoolSize = unregisterThreadPoolSize;
     this.unregisterRequestTimeSec = unregisterRequestTimeSec;
diff --git 
a/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java 
b/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
index 803d65bc..0bdf7cf0 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
@@ -30,7 +30,7 @@ import org.apache.uniffle.storage.util.StorageType;
 
 public class ClientUtils {
 
-  // BlockId is long and composed by partitionId, executorId and AtomicInteger
+  // BlockId is long and composed of partitionId, executorId and AtomicInteger.
   // AtomicInteger is first 19 bit, max value is 2^19 - 1
   // partitionId is next 24 bit, max value is 2^24 - 1
   // taskAttemptId is rest of 20 bit, max value is 2^20 - 1
@@ -118,7 +118,7 @@ public class ClientUtils {
   public static void validateTestModeConf(boolean testMode, String 
storageType) {
     if (!testMode && (StorageType.LOCALFILE.name().equals(storageType)
             || (StorageType.HDFS.name()).equals(storageType))) {
-      throw new IllegalArgumentException("RSS storage type about LOCALFILE and 
HDFS should be used in test mode, "
+      throw new IllegalArgumentException("LOCALFILE or HDFS storage type 
should be used in test mode only, "
               + "because of the poor performance of these two types.");
     }
   }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProvider.java
 
b/common/src/main/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProvider.java
index ada4d06d..96affc3b 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProvider.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProvider.java
@@ -32,7 +32,7 @@ import 
org.apache.uniffle.common.security.SecurityContextFactory;
 
 /**
  * This HadoopFilesystemProvider will provide the only entrypoint to get the 
hadoop filesystem whether
- * the dfs cluster is kerberized or not.
+ * the dfs cluster is kerberos enabled or not.
  */
 public class HadoopFilesystemProvider {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(HadoopFilesystemProvider.class);
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
index 7d8b4846..c693604d 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
@@ -33,10 +33,10 @@ public abstract class GRPCMetrics {
   private static final String GRPC_SERVER_EXECUTOR_ACTIVE_THREADS = 
"grpc_server_executor_active_threads";
   public static final String GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE_KEY = 
"grpcServerExecutorBlockingQueueSize";
   private static final String GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE = 
"grpc_server_executor_blocking_queue_size";
-  public static final String GRCP_SERVER_CONNECTION_NUMBER_KEY = 
"grpcServerConnectionNumber";
-  private static final String GRCP_SERVER_CONNECTION_NUMBER = 
"grpc_server_connection_number";
+  public static final String GRPC_SERVER_CONNECTION_NUMBER_KEY = 
"grpcServerConnectionNumber";
+  private static final String GRPC_SERVER_CONNECTION_NUMBER = 
"grpc_server_connection_number";
 
-  private boolean isRegister = false;
+  private boolean isRegistered = false;
   protected Map<String, Counter> counterMap = Maps.newConcurrentMap();
   protected Map<String, Gauge> gaugeMap = Maps.newConcurrentMap();
   protected Map<String, Summary> transportTimeSummaryMap = 
Maps.newConcurrentMap();
@@ -48,11 +48,11 @@ public abstract class GRPCMetrics {
   public abstract void registerMetrics();
 
   public void register(CollectorRegistry collectorRegistry) {
-    if (!isRegister) {
+    if (!isRegistered) {
       metricsManager = new MetricsManager(collectorRegistry);
       registerGeneralMetrics();
       registerMetrics();
-      isRegister = true;
+      isRegistered = true;
     }
   }
 
@@ -66,13 +66,13 @@ public abstract class GRPCMetrics {
         metricsManager.addGauge(GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE)
     );
     gaugeMap.putIfAbsent(
-        GRCP_SERVER_CONNECTION_NUMBER_KEY,
-        metricsManager.addGauge(GRCP_SERVER_CONNECTION_NUMBER)
+            GRPC_SERVER_CONNECTION_NUMBER_KEY,
+        metricsManager.addGauge(GRPC_SERVER_CONNECTION_NUMBER)
     );
   }
 
   public void setGauge(String tag, double value) {
-    if (isRegister) {
+    if (isRegistered) {
       Gauge gauge = gaugeMap.get(tag);
       if (gauge != null) {
         gauge.set(value);
@@ -81,7 +81,7 @@ public abstract class GRPCMetrics {
   }
 
   public void incCounter(String methodName) {
-    if (isRegister) {
+    if (isRegistered) {
       Gauge gauge = gaugeMap.get(methodName);
       if (gauge != null) {
         gauge.inc();
@@ -96,7 +96,7 @@ public abstract class GRPCMetrics {
   }
 
   public void decCounter(String methodName) {
-    if (isRegister) {
+    if (isRegistered) {
       Gauge gauge = gaugeMap.get(methodName);
       if (gauge != null) {
         gauge.dec();
diff --git 
a/common/src/main/java/org/apache/uniffle/common/rpc/MonitoringServerTransportFilter.java
 
b/common/src/main/java/org/apache/uniffle/common/rpc/MonitoringServerTransportFilter.java
index 62ad537a..085b13fd 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/rpc/MonitoringServerTransportFilter.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/rpc/MonitoringServerTransportFilter.java
@@ -24,7 +24,7 @@ import io.grpc.ServerTransportFilter;
 
 import org.apache.uniffle.common.metrics.GRPCMetrics;
 
-import static 
org.apache.uniffle.common.metrics.GRPCMetrics.GRCP_SERVER_CONNECTION_NUMBER_KEY;
+import static 
org.apache.uniffle.common.metrics.GRPCMetrics.GRPC_SERVER_CONNECTION_NUMBER_KEY;
 
 public class MonitoringServerTransportFilter extends ServerTransportFilter {
   private final AtomicLong connectionSize = new AtomicLong(0);
@@ -35,12 +35,12 @@ public class MonitoringServerTransportFilter extends 
ServerTransportFilter {
   }
 
   public Attributes transportReady(Attributes transportAttrs) {
-    grpcMetrics.setGauge(GRCP_SERVER_CONNECTION_NUMBER_KEY, 
connectionSize.incrementAndGet());
+    grpcMetrics.setGauge(GRPC_SERVER_CONNECTION_NUMBER_KEY, 
connectionSize.incrementAndGet());
     return super.transportReady(transportAttrs);
   }
 
   public void transportTerminated(Attributes transportAttrs) {
-    grpcMetrics.setGauge(GRCP_SERVER_CONNECTION_NUMBER_KEY, 
connectionSize.decrementAndGet());
+    grpcMetrics.setGauge(GRPC_SERVER_CONNECTION_NUMBER_KEY, 
connectionSize.decrementAndGet());
     super.transportTerminated(transportAttrs);
   }
 }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
index bea53018..6f2b9413 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
@@ -33,7 +33,7 @@ import static 
org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrat
 
 /**
  * Configuration for Coordinator Service and rss-cluster, including service 
port,
- * heartbeat interval and etc.
+ * heartbeat interval, etc.
  */
 public class CoordinatorConf extends RssBaseConf {
 
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
index 4b5016df..99abcee4 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
@@ -30,7 +30,7 @@ import 
org.apache.uniffle.coordinator.metric.CoordinatorGrpcMetrics;
 import org.apache.uniffle.proto.CoordinatorServerGrpc;
 import org.apache.uniffle.proto.RssProtos;
 
-import static 
org.apache.uniffle.common.metrics.GRPCMetrics.GRCP_SERVER_CONNECTION_NUMBER_KEY;
+import static 
org.apache.uniffle.common.metrics.GRPCMetrics.GRPC_SERVER_CONNECTION_NUMBER_KEY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
@@ -65,14 +65,14 @@ public class CoordinatorGrpcServerTest {
     grpcServer.start();
 
     // case1: test the single one connection metric
-    double connSize = 
grpcMetrics.getGaugeMap().get(GRCP_SERVER_CONNECTION_NUMBER_KEY).get();
+    double connSize = 
grpcMetrics.getGaugeMap().get(GRPC_SERVER_CONNECTION_NUMBER_KEY).get();
     assertEquals(0, connSize);
 
     CoordinatorGrpcClient coordinatorGrpcClient = new 
CoordinatorGrpcClient("localhost", 20001);
     coordinatorGrpcClient.registerApplicationInfo(
         new RssApplicationInfoRequest("testGrpcConnectionSize", 10000, 
"user"));
 
-    connSize = 
grpcMetrics.getGaugeMap().get(GRCP_SERVER_CONNECTION_NUMBER_KEY).get();
+    connSize = 
grpcMetrics.getGaugeMap().get(GRPC_SERVER_CONNECTION_NUMBER_KEY).get();
     assertEquals(1, connSize);
 
     // case2: test the multiple connections
@@ -81,7 +81,7 @@ public class CoordinatorGrpcServerTest {
     client1.registerApplicationInfo(new 
RssApplicationInfoRequest("testGrpcConnectionSize", 10000, "user"));
     client2.registerApplicationInfo(new 
RssApplicationInfoRequest("testGrpcConnectionSize", 10000, "user"));
 
-    connSize = 
grpcMetrics.getGaugeMap().get(GRCP_SERVER_CONNECTION_NUMBER_KEY).get();
+    connSize = 
grpcMetrics.getGaugeMap().get(GRPC_SERVER_CONNECTION_NUMBER_KEY).get();
     assertEquals(3, connSize);
 
     grpcServer.stop();
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
index 7573c616..4b20b40c 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
@@ -48,7 +48,7 @@ import org.apache.uniffle.proto.RssProtos.ShuffleServerId;
 import org.apache.uniffle.server.ShuffleServer;
 import org.apache.uniffle.server.ShuffleServerConf;
 
-import static 
org.apache.uniffle.common.metrics.GRPCMetrics.GRCP_SERVER_CONNECTION_NUMBER_KEY;
+import static 
org.apache.uniffle.common.metrics.GRPCMetrics.GRPC_SERVER_CONNECTION_NUMBER_KEY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -265,7 +265,7 @@ public class CoordinatorGrpcTest extends 
CoordinatorTestBase {
     assertEquals(oldValue + 1, newValue, 0.5);
 
     double connectionSize = coordinators.get(0)
-        
.getGrpcMetrics().getGaugeMap().get(GRCP_SERVER_CONNECTION_NUMBER_KEY).get();
+        
.getGrpcMetrics().getGaugeMap().get(GRPC_SERVER_CONNECTION_NUMBER_KEY).get();
     assertTrue(connectionSize > 0);
   }
 
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
index b9f23058..fab74160 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
@@ -50,7 +50,7 @@ public class CoordinatorClientFactory {
     LOG.info("Start to create coordinator clients from {}", coordinators);
     List<CoordinatorClient> coordinatorClients = Lists.newLinkedList();
     String[] coordinatorList = coordinators.trim().split(",");
-    if (coordinatorList.length <= 0) {
+    if (coordinatorList.length == 0) {
       String msg = "Invalid " + coordinators;
       LOG.error(msg);
       throw new RuntimeException(msg);
diff --git a/server/src/main/java/org/apache/uniffle/server/HealthCheck.java 
b/server/src/main/java/org/apache/uniffle/server/HealthCheck.java
index ac21024b..ecfa28b1 100644
--- a/server/src/main/java/org/apache/uniffle/server/HealthCheck.java
+++ b/server/src/main/java/org/apache/uniffle/server/HealthCheck.java
@@ -30,8 +30,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * HealthCheck will check every server whether has the ability to process 
shuffle data. Currently, we only support disk
- * checker. If enough disks don't have enough disk space, server will become 
unhealthy, and only enough disks
+ * HealthCheck will check every server whether it has the ability to process 
shuffle data. Currently, we only support
+ * disk checker. If enough disks don't have enough disk space, server will 
become unhealthy, and only enough disks
  * have enough disk space, server will become healthy again.
  **/
 public class HealthCheck {
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 10d59f8a..56e1a68a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -179,9 +179,9 @@ public class ShuffleServer {
 
     boolean healthCheckEnable = 
shuffleServerConf.getBoolean(ShuffleServerConf.HEALTH_CHECK_ENABLE);
     if (healthCheckEnable) {
-      List<Checker> buildInCheckers = Lists.newArrayList();
-      buildInCheckers.add(storageManager.getStorageChecker());
-      healthCheck = new HealthCheck(isHealthy, shuffleServerConf, 
buildInCheckers);
+      List<Checker> builtInCheckers = Lists.newArrayList();
+      builtInCheckers.add(storageManager.getStorageChecker());
+      healthCheck = new HealthCheck(isHealthy, shuffleServerConf, 
builtInCheckers);
       healthCheck.start();
     }
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 71371ae8..c197b525 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -465,7 +465,7 @@ public class ShuffleTaskManager {
     LOG.info("Start check leak shuffle data");
     try {
       Set<String> appIds = Sets.newHashSet(shuffleTaskInfos.keySet());
-      storageManager.checkAndClearLeakShuffleData(appIds);
+      storageManager.checkAndClearLeakedShuffleData(appIds);
       LOG.info("Finish check leak shuffle data");
     } catch (Exception e) {
       LOG.warn("Error happened in checkLeakShuffleData", e);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index 9b0a50a2..187d6d65 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -255,7 +255,7 @@ public class ShuffleBuffer {
       try {
         System.arraycopy(block.getData(), 0, data, offset, block.getLength());
       } catch (Exception e) {
-        LOG.error("Unexpect exception for System.arraycopy, length["
+        LOG.error("Unexpected exception for System.arraycopy, length["
             + block.getLength() + "], offset["
             + offset + "], dataLength[" + data.length + "]", e);
         throw e;
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index ddad74b3..a5aa9973 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -103,7 +103,7 @@ public class ShuffleBufferManager {
   public StatusCode cacheShuffleData(String appId, int shuffleId,
       boolean isPreAllocated, ShufflePartitionedData spd) {
     if (!isPreAllocated && isFull()) {
-      LOG.warn("Got unexpect data, can't cache it because the space is full");
+      LOG.warn("Got unexpected data, can't cache it because the space is 
full");
       return StatusCode.NO_BUFFER;
     }
 
@@ -182,7 +182,7 @@ public class ShuffleBufferManager {
 
   void flushSingleBufferIfNecessary(ShuffleBuffer buffer, String appId,
       int shuffleId, int startPartition, int endPartition) {
-    // When we use multistorage and trigger single buffer flush, the buffer 
size should be bigger
+    // When we use multi storage and trigger single buffer flush, the buffer 
size should be bigger
     // than rss.server.flush.cold.storage.threshold.size, otherwise cold 
storage will be useless.
     if (this.bufferFlushEnabled && buffer.getSize() > 
this.bufferFlushThreshold) {
       flushBuffer(buffer, appId, shuffleId, startPartition, endPartition);
@@ -428,7 +428,7 @@ public class ShuffleBufferManager {
 
     Map<String, Set<Integer>> pickedShuffle = Maps.newHashMap();
     // The algorithm here is to flush data size > highWaterMark - lowWaterMark
-    // the remain data in buffer maybe more than lowWaterMark
+    // the remaining data in buffer maybe more than lowWaterMark
     // because shuffle server is still receiving data, but it should be ok
     long expectedFlushSize = highWaterMark - lowWaterMark;
     long pickedFlushSize = 0L;
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
index 09c7dd03..516391cd 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
@@ -138,7 +138,7 @@ public class HdfsStorageManager extends 
SingleStorageManager {
   }
 
   @Override
-  public void checkAndClearLeakShuffleData(Collection<String> appIds) {
+  public void checkAndClearLeakedShuffleData(Collection<String> appIds) {
   }
 
   public HdfsStorage getStorageByAppId(String appId) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 4e47a34f..ee099939 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -223,7 +223,7 @@ public class LocalStorageManager extends 
SingleStorageManager {
   }
 
   @Override
-  public void checkAndClearLeakShuffleData(Collection<String> appIds) {
+  public void checkAndClearLeakedShuffleData(Collection<String> appIds) {
     Set<String> appIdsOnStorages = new HashSet<>();
     for (LocalStorage localStorage : localStorages) {
       if (!localStorage.isCorrupted()) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
index ee53ff03..53039279 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
@@ -140,8 +140,8 @@ public class MultiStorageManager implements StorageManager {
   }
 
   @Override
-  public void checkAndClearLeakShuffleData(Collection<String> appIds) {
-    warmStorageManager.checkAndClearLeakShuffleData(appIds);
+  public void checkAndClearLeakedShuffleData(Collection<String> appIds) {
+    warmStorageManager.checkAndClearLeakedShuffleData(appIds);
   }
 
   public void removeResources(PurgeEvent event) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java 
b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
index 2ba7b4c5..2a487535 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
@@ -53,5 +53,5 @@ public interface StorageManager {
 
   // todo: add an interface that check storage isHealthy
 
-  void checkAndClearLeakShuffleData(Collection<String> appIds);
+  void checkAndClearLeakedShuffleData(Collection<String> appIds);
 }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 8d27f0f8..3e8afaa9 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -258,7 +258,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     size = storage.getHandlerSize();
     assertEquals(0, size);
     // fs create a remoteStorage for appId2 before remove resources,
-    // but thecache from appIdToStorages has removed, so we need to delete 
this path in hdfs
+    // but the cache from appIdToStorages has been removed, so we need to 
delete this path in hdfs
     Path path = new Path(remoteStorage.getPath() + "/" + appId2 + "/");
     assertTrue(fs.mkdirs(path));
     storageManager.removeResources(
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 129806c3..31e48903 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -68,7 +68,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 public class ShuffleTaskManagerTest extends HdfsTestBase {
 
-  private static AtomicInteger ATOMIC_INT = new AtomicInteger(0);
+  private static final AtomicInteger ATOMIC_INT = new AtomicInteger(0);
 
   @AfterAll
   public static void tearDown() {
diff --git 
a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
index da6e5ae5..da536099 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
@@ -162,7 +162,7 @@ public class LocalStorageManagerTest {
     localStorageManager = new LocalStorageManager(conf);
     assertEquals(2, localStorageManager.getStorages().size());
 
-    // case4: only have 1 candidates, but exceed the number of 
rss.server.localstorage.initialize.max.fail.number
+    // case4: only have 1 candidate, but exceed the number of 
rss.server.localstorage.initialize.max.fail.number
     conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
Arrays.asList("/a/rss-data", "/tmp/rss-data-1"));
     conf.setLong(ShuffleServerConf.LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER, 
0L);
     try {
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java 
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
index 07513981..cdfea458 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  *  Metadata has three dimensions from top to down including disk, shuffle, 
partition.
- *  And each dimensions contains two aspects, status data and indicator data.
+ *  And each dimension contains two aspects, status data and indicator data.
  *  Disk status data contains writable flag, Shuffle status data contains 
stable, uploading, deleting flag.
  *  Disk indicator data contains size, fileNum, shuffleNum, Shuffle indicator 
contains size, partition list,
  *  uploaded partition list and uploaded size.
@@ -48,7 +48,7 @@ public class LocalStorageMeta {
   // todo: add ut
   public List<String> getSortedShuffleKeys(boolean checkRead, int hint) {
     // Filter the unread shuffle is checkRead is true
-    // Filter the remain size is 0
+    // Filter the remaining size is 0
     List<Map.Entry<String, ShuffleMeta>> shuffleMetaList = shuffleMetaMap
         .entrySet()
         .stream()
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
 
b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
index 7deda234..71cca83c 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
@@ -78,7 +78,7 @@ public class ShuffleStorageUtils {
       } else {
         Collections.sort(segments);
         long start = -1;
-        long lastestPosition = -1;
+        long latestPosition = -1;
         long skipThreshold = readBufferSize / 2;
         long lastPosition = Long.MAX_VALUE;
         List<BufferSegment> bufferSegments = Lists.newArrayList();
@@ -94,16 +94,16 @@ public class ShuffleStorageUtils {
             bufferSegments = Lists.newArrayList();
             start = segment.getOffset();
           }
-          lastestPosition = segment.getOffset() + segment.getLength();
+          latestPosition = segment.getOffset() + segment.getLength();
           bufferSegments.add(new BufferSegment(segment.getBlockId(),
               segment.getOffset() - start, segment.getLength(),
               segment.getUncompressLength(), segment.getCrc(), 
segment.getTaskAttemptId()));
-          if (lastestPosition - start >= readBufferSize) {
+          if (latestPosition - start >= readBufferSize) {
             dataFileSegments.add(new DataFileSegment(
-                path, start, (int) (lastestPosition - start), bufferSegments));
+                path, start, (int) (latestPosition - start), bufferSegments));
             start = -1;
           }
-          lastPosition = lastestPosition;
+          lastPosition = latestPosition;
         }
         if (start > -1) {
           dataFileSegments.add(new DataFileSegment(path, start, (int) 
(lastPosition - start), bufferSegments));


Reply via email to