This is an automated email from the ASF dual-hosted git repository.
maobaolong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 1364a828e [#2229] feat(server,client): Introduce OutputUtils class to
reduce log size (#2230)
1364a828e is described below
commit 1364a828ef376109000032547e6c3361405770db
Author: maobaolong <[email protected]>
AuthorDate: Wed Nov 6 18:54:48 2024 +0800
[#2229] feat(server,client): Introduce OutputUtils class to reduce log size
(#2230)
### What changes were proposed in this pull request?
Introduce OutputUtils class to reduce log size
### Why are the changes needed?
Fix: #2229
### Does this PR introduce _any_ user-facing change?
Log become useful but tiny enough.
### How was this patch tested?
new UT
---
.../apache/uniffle/common/util/OutputUtils.java | 101 +++++++++++++++++++++
.../uniffle/common/util/OutputUtilsTest.java | 52 +++++++++++
.../client/impl/grpc/ShuffleServerGrpcClient.java | 5 +-
.../uniffle/server/ShuffleServerGrpcService.java | 15 ++-
.../apache/uniffle/server/ShuffleTaskManager.java | 6 +-
5 files changed, 174 insertions(+), 5 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/util/OutputUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/OutputUtils.java
new file mode 100644
index 000000000..b811b51cc
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/util/OutputUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+import java.util.List;
+
+public class OutputUtils {
+
+ /**
+ * Convert a list of number into a segment string.
+ *
+ * @param numbers the list of number
+ * @return the segment string
+ */
+ public static String listToSegment(List<Integer> numbers) {
+ return listToSegment(numbers, 1, Long.MAX_VALUE);
+ }
+
+ /**
+ * Convert a list of number into a segment string.
+ *
+ * @param numbers the list of number
+ * @param threshold if the segment size is larger than this threshold, it
will be converted into a
+ * segment string
+ * @return the segment string
+ */
+ public static String listToSegment(List<Integer> numbers, long threshold) {
+ return listToSegment(numbers, threshold, Long.MAX_VALUE);
+ }
+
+ /**
+ * Convert a list of number into a segment string.
+ *
+ * @param numbers the list of number
+ * @param threshold if the segment size is larger than this threshold, it
will be converted into a
+ * segment string
+ * @param limit the maximum number of elements in the segment
+ * @return the segment string
+ */
+ public static String listToSegment(List<Integer> numbers, long threshold,
long limit) {
+ if (numbers == null || numbers.isEmpty()) {
+ return "[]";
+ }
+ if (threshold < 1 || numbers.size() <= threshold) {
+ return numbers.toString();
+ }
+
+ StringBuilder result = new StringBuilder();
+ int start = numbers.get(0);
+ int end = start;
+
+ long rangeCount = 0;
+ for (int i = 1; i < numbers.size(); i++) {
+ if (numbers.get(i) == numbers.get(i - 1) + 1) {
+ end = numbers.get(i);
+ } else {
+ if (rangeCount < limit) {
+ appendRange(result, start, end);
+ }
+ rangeCount++;
+ start = numbers.get(i);
+ end = start;
+ }
+ }
+ rangeCount++;
+ if (rangeCount < limit) {
+ // Append the last range
+ appendRange(result, start, end);
+ } else {
+ result.append("...").append(rangeCount - limit).append(" more
ranges...");
+ }
+
+ return result.toString();
+ }
+
+ private static void appendRange(StringBuilder result, int start, int end) {
+ if (result.length() > 0) {
+ result.append(", ");
+ }
+ if (start == end) {
+ result.append(start);
+ } else {
+ result.append("[").append(start).append("~").append(end).append("]");
+ }
+ }
+}
diff --git
a/common/src/test/java/org/apache/uniffle/common/util/OutputUtilsTest.java
b/common/src/test/java/org/apache/uniffle/common/util/OutputUtilsTest.java
new file mode 100644
index 000000000..5545876d4
--- /dev/null
+++ b/common/src/test/java/org/apache/uniffle/common/util/OutputUtilsTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class OutputUtilsTest {
+ @Test
+ public void test() {
+ List<Integer> numbers =
+ Arrays.asList(
+ 9527, 9528, 9529, 9530, 11375, 11376, 11377, 11378, 11379, 12000,
12001, 12002, 12003,
+ 12004);
+ assertEquals("[9527~9530], [11375~11379], [12000~12004]",
OutputUtils.listToSegment(numbers));
+ assertEquals(
+ "[9527~9530], [11375~11379], [12000~12004]",
OutputUtils.listToSegment(numbers, 4));
+ assertEquals(
+ "[9527, 9528, 9529, 9530, 11375, 11376, 11377, 11378,"
+ + " 11379, 12000, 12001, 12002, 12003, 12004]",
+ OutputUtils.listToSegment(numbers, 20));
+ // limit
+ assertEquals(
+ "[9527~9530], [11375~11379]...1 more ranges...",
OutputUtils.listToSegment(numbers, 1, 2));
+ assertEquals("[9527~9530]...2 more ranges...",
OutputUtils.listToSegment(numbers, 1, 1));
+ assertEquals("...3 more ranges...", OutputUtils.listToSegment(numbers, 1,
0));
+
+ // corner case
+ assertEquals("[9527]", OutputUtils.listToSegment(Arrays.asList(9527)));
+ assertEquals("[]", OutputUtils.listToSegment(Arrays.asList()));
+ assertEquals("[]", OutputUtils.listToSegment(null));
+ }
+}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 98180d647..6bd1f8990 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -77,6 +77,7 @@ import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.OutputUtils;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.proto.RssProtos;
@@ -316,7 +317,7 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
"Requiring buffer for appId: {}, shuffleId: {}, partitionIds: {}
with {} bytes from {}:{}",
appId,
shuffleId,
- partitionIds,
+ OutputUtils.listToSegment(partitionIds, 10),
requireSize,
host,
port);
@@ -355,7 +356,7 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
"Can't require buffer for appId: {}, shuffleId: {}, partitionIds:
{} with {} bytes from {}:{} due to {}, sleep and try[{}] again",
appId,
shuffleId,
- partitionIds,
+ OutputUtils.listToSegment(partitionIds, 10),
requireSize,
host,
port,
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 85fe7fa6e..b46418bc8 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -59,6 +59,7 @@ import
org.apache.uniffle.common.rpc.ClientContextServerInterceptor;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.BlockIdLayout;
import org.apache.uniffle.common.util.ByteBufUtils;
+import org.apache.uniffle.common.util.OutputUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.proto.RssProtos.AppHeartBeatRequest;
@@ -711,6 +712,10 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
if (request.getPartitionIdsList() != null) {
auditArgs += ", partitionIdsSize=" +
request.getPartitionIdsList().size();
}
+ if (request.getPartitionIdsList() != null) {
+ auditArgs +=
+ ", partitionIds=" +
OutputUtils.listToSegment(request.getPartitionIdsList(), 1, 10);
+ }
auditContext.withArgs(auditArgs);
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
@@ -992,8 +997,14 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
request.getBlockIdLayout().getTaskAttemptIdBits());
auditContext.withAppId(appId).withShuffleId(shuffleId);
+ String partitionIdsOutput = OutputUtils.listToSegment(partitionsList, 1,
10);
auditContext.withArgs(
- "partitionsListSize=" + partitionsList.size() + ", blockIdLayout=" +
blockIdLayout);
+ "partitionsListSize="
+ + partitionsList.size()
+ + ", partitionIds="
+ + partitionIdsOutput
+ + ", blockIdLayout="
+ + blockIdLayout);
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
@@ -1012,7 +1023,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
GetShuffleResultForMultiPartResponse reply;
byte[] serializedBlockIds = null;
String requestInfo =
- "appId[" + appId + "], shuffleId[" + shuffleId + "], partitions" +
partitionsList;
+ "appId[" + appId + "], shuffleId[" + shuffleId + "], partitions=" +
partitionIdsOutput;
ByteString serializedBlockIdsBytes = ByteString.EMPTY;
try {
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index af37646a7..b80daba20 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -71,6 +71,7 @@ import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.BlockIdLayout;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;
+import org.apache.uniffle.common.util.OutputUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.common.util.UnitConverter;
@@ -604,7 +605,10 @@ public class ShuffleTaskManager {
String errorMessage =
String.format(
"Huge partition is limited to writing. appId: %s, shuffleId:
%s, partitionIds: %s, partitionUsedDataSize: %s",
- appId, shuffleId, partitionIds, partitionUsedDataSize);
+ appId,
+ shuffleId,
+ OutputUtils.listToSegment(partitionIds, 10),
+ partitionUsedDataSize);
LOG.error(errorMessage);
throw new NoBufferForHugePartitionException(errorMessage);
}