This is an automated email from the ASF dual-hosted git repository.

ckj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 01d37beb [ISSUE-489][Minor] Cleanup some code (#490)
01d37beb is described below

commit 01d37bebf522d9be8a1a28a45eb4ba590011807c
Author: 王杰 <[email protected]>
AuthorDate: Mon Jan 16 17:38:07 2023 +0800

    [ISSUE-489][Minor] Cleanup some code (#490)
    
    ### What changes were proposed in this pull request?
    
    We can simplify and standardize some of the code.
    
    ### Why are the changes needed?
    
    Fixes #489
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing CI.
---
 .../main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java | 7 ++-----
 .../java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java   | 2 --
 .../java/org/apache/spark/shuffle/reader/RssShuffleReader.java     | 2 +-
 .../main/java/org/apache/uniffle/common/config/ConfigUtils.java    | 2 +-
 .../src/main/java/org/apache/uniffle/common/util/RetryUtils.java   | 3 +--
 .../strategy/assignment/PartitionBalanceAssignmentStrategy.java    | 2 +-
 server/src/main/java/org/apache/uniffle/server/ShuffleServer.java  | 2 +-
 .../main/java/org/apache/uniffle/server/ShuffleTaskManager.java    | 4 ++--
 .../org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java    | 2 +-
 .../uniffle/storage/handler/impl/HdfsShuffleReadHandler.java       | 2 +-
 10 files changed, 11 insertions(+), 17 deletions(-)

diff --git 
a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java 
b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
index 36d5ab96..cb76bef2 100644
--- 
a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
+++ 
b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
@@ -203,10 +203,7 @@ public class SortWriteBufferManager<K, V> {
         return o2.getDataLength() - o1.getDataLength();
       }
     });
-    int sendSize = batch;
-    if (batch > waitSendBuffers.size()) {
-      sendSize = waitSendBuffers.size();
-    }
+    int sendSize = Math.min(batch, waitSendBuffers.size());
     Iterator<SortWriteBuffer<K, V>> iterator = waitSendBuffers.iterator();
     int index = 0;
     List<ShuffleBlockInfo> shuffleBlocks = Lists.newArrayList();
@@ -318,7 +315,7 @@ public class SortWriteBufferManager<K, V> {
     final byte[] compressed = codec.compress(data);
     final long crc32 = ChecksumUtils.getCrc32(compressed);
     compressTime += System.currentTimeMillis() - start;
-    final long blockId = RssMRUtils.getBlockId((long)partitionId, 
taskAttemptId, getNextSeqNo(partitionId));
+    final long blockId = RssMRUtils.getBlockId(partitionId, taskAttemptId, 
getNextSeqNo(partitionId));
     uncompressedDataLen += data.length;
     // add memory to indicate bytes which will be sent to shuffle server
     inSendListBytes.addAndGet(wb.getDataLength());
diff --git 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java
 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java
index 8e0859cc..505616f1 100644
--- 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java
+++ 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java
@@ -131,8 +131,6 @@ public class RssFetcher<K,V> {
         // Do shuffle
         metrics.threadBusy();
         copyFromRssServer();
-      } catch (Exception e) {
-        throw e;
       } finally {
         metrics.threadFree();
       }
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 88b2d0bb..1ac1641a 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -161,7 +161,7 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
       Function1<TaskContext, Void> fn1 = new AbstractFunction1<TaskContext, 
Void>() {
         public Void apply(TaskContext context) {
           sorter.stop();
-          return (Void) null;
+          return null;
         }
       };
       context.addTaskCompletionListener(fn1);
diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java 
b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
index c93334e0..578276ad 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
@@ -212,6 +212,6 @@ public class ConfigUtils {
       value -> value > 0;
 
   public static final Function<Double, Boolean> PERCENTAGE_DOUBLE_VALIDATOR =
-      (Function<Double, Boolean>) value -> Double.compare(value, 100.0) <= 0 
&& Double.compare(value, 0.0) >= 0;
+      value -> Double.compare(value, 100.0) <= 0 && Double.compare(value, 0.0) 
>= 0;
 
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java 
b/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java
index 03f817ff..6a4c1ae9 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java
@@ -51,8 +51,7 @@ public class RetryUtils {
     int retry = 0;
     while (true) {
       try {
-        T ret = cmd.execute();
-        return ret;
+        return cmd.execute();
       } catch (Throwable t) {
         retry++;
         if ((exceptionClasses != null && !isInstanceOf(exceptionClasses, t)) 
|| retry >= retryTimes
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
index 3bc6b14f..0b8964a4 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
@@ -96,7 +96,7 @@ public class PartitionBalanceAssignmentStrategy extends 
AbstractAssignmentStrate
       }
       serverToPartitions = newPartitionInfos;
       int averagePartitions = totalPartitionNum * replica / 
clusterManager.getShuffleNodesMax();
-      int assignPartitions = averagePartitions < 1 ? 1 : averagePartitions;
+      int assignPartitions = Math.max(averagePartitions, 1);
       nodes.sort(new Comparator<ServerNode>() {
         @Override
         public int compare(ServerNode o1, ServerNode o2) {
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index e4d09b51..796ba305 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -172,7 +172,7 @@ public class ShuffleServer {
 
     SecurityConfig securityConfig = null;
     if (shuffleServerConf.getBoolean(RSS_SECURITY_HADOOP_KERBEROS_ENABLE)) {
-      securityConfig = securityConfig.newBuilder()
+      securityConfig = SecurityConfig.newBuilder()
           
.krb5ConfPath(shuffleServerConf.getString(RSS_SECURITY_HADOOP_KRB5_CONF_FILE))
           
.keytabFilePath(shuffleServerConf.getString(RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE))
           
.principal(shuffleServerConf.getString(RSS_SECURITY_HADOOP_KERBEROS_PRINCIPAL))
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index bd23e318..d103f506 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -90,7 +90,7 @@ public class ShuffleTaskManager {
   // merge different blockId of partition to one bitmap can reduce memory cost,
   // but when get blockId, performance will degrade a little which can be 
optimized by client configuration
   private Map<String, Map<Integer, Roaring64NavigableMap[]>> 
partitionsToBlockIds;
-  private ShuffleBufferManager shuffleBufferManager;
+  private final ShuffleBufferManager shuffleBufferManager;
   private Map<String, ShuffleTaskInfo> shuffleTaskInfos = 
Maps.newConcurrentMap();
   private Map<Long, PreAllocatedBufferInfo> requireBufferIds = 
Maps.newConcurrentMap();
   private Runnable clearResourceThread;
@@ -144,7 +144,7 @@ public class ShuffleTaskManager {
             removeResources(event.getAppId());
           }
           if (event instanceof ShufflePurgeEvent) {
-            removeResourcesByShuffleIds(event.getAppId(), ((ShufflePurgeEvent) 
event).getShuffleIds());
+            removeResourcesByShuffleIds(event.getAppId(), 
event.getShuffleIds());
           }
         } catch (Exception e) {
           LOG.error("Exception happened when clear resource for expired 
application", e);
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
index 6b899aef..6750d6ba 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
@@ -122,7 +122,7 @@ public class HdfsFileWriter implements FileWriter, 
Closeable {
     headerContentBuf.flip();
     fsDataOutputStream.writeLong(ChecksumUtils.getCrc32(headerContentBuf));
     long len = ShuffleStorageUtils.getIndexFileHeaderLen(partitionList.size());
-    if (fsDataOutputStream.getPos() != (long) len) {
+    if (fsDataOutputStream.getPos() != len) {
       throw new IOException("Fail to write index header");
     }
   }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
index 810ff1bd..3e749679 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
@@ -81,7 +81,7 @@ public class HdfsShuffleReadHandler extends 
DataSkippableReadHandler {
     long start = System.currentTimeMillis();
     try {
       byte[] indexData = indexReader.read();
-      int segmentNumber = (int) (indexData.length / 
FileBasedShuffleSegment.SEGMENT_SIZE);
+      int segmentNumber = indexData.length / 
FileBasedShuffleSegment.SEGMENT_SIZE;
       int expectedLen = segmentNumber * FileBasedShuffleSegment.SEGMENT_SIZE;
       if (indexData.length != expectedLen) {
         LOG.warn("Maybe the index file: {} is being written due to the 
shuffle-buffer flushing.", filePrefix);

Reply via email to