This is an automated email from the ASF dual-hosted git repository.
ckj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 01d37beb [ISSUE-489][Minor] Cleanup some code (#490)
01d37beb is described below
commit 01d37bebf522d9be8a1a28a45eb4ba590011807c
Author: 王杰 <[email protected]>
AuthorDate: Mon Jan 16 17:38:07 2023 +0800
[ISSUE-489][Minor] Cleanup some code (#490)
### What changes were proposed in this pull request?
We can simplify and standardize some of the code.
### Why are the changes needed?
Fixes #489
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing CI.
---
.../main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java | 7 ++-----
.../java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java | 2 --
.../java/org/apache/spark/shuffle/reader/RssShuffleReader.java | 2 +-
.../main/java/org/apache/uniffle/common/config/ConfigUtils.java | 2 +-
.../src/main/java/org/apache/uniffle/common/util/RetryUtils.java | 3 +--
.../strategy/assignment/PartitionBalanceAssignmentStrategy.java | 2 +-
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java | 2 +-
.../main/java/org/apache/uniffle/server/ShuffleTaskManager.java | 4 ++--
.../org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java | 2 +-
.../uniffle/storage/handler/impl/HdfsShuffleReadHandler.java | 2 +-
10 files changed, 11 insertions(+), 17 deletions(-)
diff --git
a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
index 36d5ab96..cb76bef2 100644
---
a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
+++
b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
@@ -203,10 +203,7 @@ public class SortWriteBufferManager<K, V> {
return o2.getDataLength() - o1.getDataLength();
}
});
- int sendSize = batch;
- if (batch > waitSendBuffers.size()) {
- sendSize = waitSendBuffers.size();
- }
+ int sendSize = Math.min(batch, waitSendBuffers.size());
Iterator<SortWriteBuffer<K, V>> iterator = waitSendBuffers.iterator();
int index = 0;
List<ShuffleBlockInfo> shuffleBlocks = Lists.newArrayList();
@@ -318,7 +315,7 @@ public class SortWriteBufferManager<K, V> {
final byte[] compressed = codec.compress(data);
final long crc32 = ChecksumUtils.getCrc32(compressed);
compressTime += System.currentTimeMillis() - start;
- final long blockId = RssMRUtils.getBlockId((long)partitionId,
taskAttemptId, getNextSeqNo(partitionId));
+ final long blockId = RssMRUtils.getBlockId(partitionId, taskAttemptId,
getNextSeqNo(partitionId));
uncompressedDataLen += data.length;
// add memory to indicate bytes which will be sent to shuffle server
inSendListBytes.addAndGet(wb.getDataLength());
diff --git
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java
index 8e0859cc..505616f1 100644
---
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java
+++
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java
@@ -131,8 +131,6 @@ public class RssFetcher<K,V> {
// Do shuffle
metrics.threadBusy();
copyFromRssServer();
- } catch (Exception e) {
- throw e;
} finally {
metrics.threadFree();
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 88b2d0bb..1ac1641a 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -161,7 +161,7 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
Function1<TaskContext, Void> fn1 = new AbstractFunction1<TaskContext,
Void>() {
public Void apply(TaskContext context) {
sorter.stop();
- return (Void) null;
+ return null;
}
};
context.addTaskCompletionListener(fn1);
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
index c93334e0..578276ad 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
@@ -212,6 +212,6 @@ public class ConfigUtils {
value -> value > 0;
public static final Function<Double, Boolean> PERCENTAGE_DOUBLE_VALIDATOR =
- (Function<Double, Boolean>) value -> Double.compare(value, 100.0) <= 0
&& Double.compare(value, 0.0) >= 0;
+ value -> Double.compare(value, 100.0) <= 0 && Double.compare(value, 0.0)
>= 0;
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java
index 03f817ff..6a4c1ae9 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java
@@ -51,8 +51,7 @@ public class RetryUtils {
int retry = 0;
while (true) {
try {
- T ret = cmd.execute();
- return ret;
+ return cmd.execute();
} catch (Throwable t) {
retry++;
if ((exceptionClasses != null && !isInstanceOf(exceptionClasses, t))
|| retry >= retryTimes
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
index 3bc6b14f..0b8964a4 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
@@ -96,7 +96,7 @@ public class PartitionBalanceAssignmentStrategy extends
AbstractAssignmentStrate
}
serverToPartitions = newPartitionInfos;
int averagePartitions = totalPartitionNum * replica /
clusterManager.getShuffleNodesMax();
- int assignPartitions = averagePartitions < 1 ? 1 : averagePartitions;
+ int assignPartitions = Math.max(averagePartitions, 1);
nodes.sort(new Comparator<ServerNode>() {
@Override
public int compare(ServerNode o1, ServerNode o2) {
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index e4d09b51..796ba305 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -172,7 +172,7 @@ public class ShuffleServer {
SecurityConfig securityConfig = null;
if (shuffleServerConf.getBoolean(RSS_SECURITY_HADOOP_KERBEROS_ENABLE)) {
- securityConfig = securityConfig.newBuilder()
+ securityConfig = SecurityConfig.newBuilder()
.krb5ConfPath(shuffleServerConf.getString(RSS_SECURITY_HADOOP_KRB5_CONF_FILE))
.keytabFilePath(shuffleServerConf.getString(RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE))
.principal(shuffleServerConf.getString(RSS_SECURITY_HADOOP_KERBEROS_PRINCIPAL))
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index bd23e318..d103f506 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -90,7 +90,7 @@ public class ShuffleTaskManager {
// merge different blockId of partition to one bitmap can reduce memory cost,
// but when get blockId, performance will degrade a little which can be
optimized by client configuration
private Map<String, Map<Integer, Roaring64NavigableMap[]>>
partitionsToBlockIds;
- private ShuffleBufferManager shuffleBufferManager;
+ private final ShuffleBufferManager shuffleBufferManager;
private Map<String, ShuffleTaskInfo> shuffleTaskInfos =
Maps.newConcurrentMap();
private Map<Long, PreAllocatedBufferInfo> requireBufferIds =
Maps.newConcurrentMap();
private Runnable clearResourceThread;
@@ -144,7 +144,7 @@ public class ShuffleTaskManager {
removeResources(event.getAppId());
}
if (event instanceof ShufflePurgeEvent) {
- removeResourcesByShuffleIds(event.getAppId(), ((ShufflePurgeEvent)
event).getShuffleIds());
+ removeResourcesByShuffleIds(event.getAppId(),
event.getShuffleIds());
}
} catch (Exception e) {
LOG.error("Exception happened when clear resource for expired
application", e);
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
index 6b899aef..6750d6ba 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
@@ -122,7 +122,7 @@ public class HdfsFileWriter implements FileWriter,
Closeable {
headerContentBuf.flip();
fsDataOutputStream.writeLong(ChecksumUtils.getCrc32(headerContentBuf));
long len = ShuffleStorageUtils.getIndexFileHeaderLen(partitionList.size());
- if (fsDataOutputStream.getPos() != (long) len) {
+ if (fsDataOutputStream.getPos() != len) {
throw new IOException("Fail to write index header");
}
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
index 810ff1bd..3e749679 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
@@ -81,7 +81,7 @@ public class HdfsShuffleReadHandler extends
DataSkippableReadHandler {
long start = System.currentTimeMillis();
try {
byte[] indexData = indexReader.read();
- int segmentNumber = (int) (indexData.length /
FileBasedShuffleSegment.SEGMENT_SIZE);
+ int segmentNumber = indexData.length /
FileBasedShuffleSegment.SEGMENT_SIZE;
int expectedLen = segmentNumber * FileBasedShuffleSegment.SEGMENT_SIZE;
if (indexData.length != expectedLen) {
LOG.warn("Maybe the index file: {} is being written due to the
shuffle-buffer flushing.", filePrefix);