This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 0ad8a15d [ISSUE-542]Ensure that the elements of StatusCode and
RssProtos.StatusCode are the same (#543)
0ad8a15d is described below
commit 0ad8a15d0b906099683417002a6ee70531e94ddd
Author: xianjingfeng <[email protected]>
AuthorDate: Thu Feb 2 20:44:11 2023 +0800
[ISSUE-542]Ensure that the elements of StatusCode and RssProtos.StatusCode
are the same (#543)
### What changes were proposed in this pull request?
Ensure the elements of `StatusCode` and `RssProtos.StatusCode` are the same
### Why are the changes needed?
It may cause bugs, Fix #542
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UTs
---
.../uniffle/server/ShuffleServerGrpcService.java | 65 ++++++++--------------
.../java/org/apache/uniffle/server/StatusCode.java | 10 +++-
.../org/apache/uniffle/server/StatusCodeTest.java | 54 ++++++++++++++++++
3 files changed, 85 insertions(+), 44 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 774a79b9..57e3d12e 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -92,27 +92,6 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
this.shuffleServer = shuffleServer;
}
- public static RssProtos.StatusCode valueOf(StatusCode code) {
- switch (code) {
- case SUCCESS:
- return RssProtos.StatusCode.SUCCESS;
- case DOUBLE_REGISTER:
- return RssProtos.StatusCode.DOUBLE_REGISTER;
- case NO_BUFFER:
- return RssProtos.StatusCode.NO_BUFFER;
- case INVALID_STORAGE:
- return RssProtos.StatusCode.INVALID_STORAGE;
- case NO_REGISTER:
- return RssProtos.StatusCode.NO_REGISTER;
- case NO_PARTITION:
- return RssProtos.StatusCode.NO_PARTITION;
- case TIMEOUT:
- return RssProtos.StatusCode.TIMEOUT;
- default:
- return RssProtos.StatusCode.INTERNAL_ERROR;
- }
- }
-
@Override
public void unregisterShuffle(RssProtos.ShuffleUnregisterRequest request,
StreamObserver<RssProtos.ShuffleUnregisterResponse>
responseStreamObserver) {
@@ -129,7 +108,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
RssProtos.ShuffleUnregisterResponse reply =
RssProtos.ShuffleUnregisterResponse
.newBuilder()
- .setStatus(valueOf(result))
+ .setStatus(result.toProto())
.setRetMsg(responseMessage)
.build();
responseStreamObserver.onNext(reply);
@@ -178,7 +157,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
reply = ShuffleRegisterResponse
.newBuilder()
- .setStatus(valueOf(result))
+ .setStatus(result.toProto())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
@@ -224,7 +203,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
responseMessage = errorMsg;
reply = SendShuffleDataResponse
.newBuilder()
- .setStatus(valueOf(StatusCode.INTERNAL_ERROR))
+ .setStatus(StatusCode.INTERNAL_ERROR.toProto())
.setRetMsg(responseMessage)
.build();
responseObserver.onNext(reply);
@@ -267,7 +246,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
if (info.getRequireSize() > alreadyReleasedSize) {
manager.releasePreAllocatedSize(info.getRequireSize() -
alreadyReleasedSize);
}
- reply =
SendShuffleDataResponse.newBuilder().setStatus(valueOf(ret)).setRetMsg(responseMessage).build();
+ reply =
SendShuffleDataResponse.newBuilder().setStatus(ret.toProto()).setRetMsg(responseMessage).build();
long costTime = System.currentTimeMillis() - start;
shuffleServer.getGrpcMetrics().recordProcessTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD,
costTime);
LOG.debug("Cache Shuffle Data for appId[" + appId + "], shuffleId[" +
shuffleId
@@ -276,7 +255,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
} else {
reply = SendShuffleDataResponse
.newBuilder()
- .setStatus(valueOf(StatusCode.INTERNAL_ERROR))
+ .setStatus(StatusCode.INTERNAL_ERROR.toProto())
.setRetMsg("No data in request")
.build();
}
@@ -313,7 +292,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
reply = ShuffleCommitResponse
.newBuilder()
.setCommitCount(commitCount)
- .setStatus(valueOf(status))
+ .setStatus(status.toProto())
.setRetMsg(msg)
.build();
responseObserver.onNext(reply);
@@ -345,7 +324,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
FinishShuffleResponse response =
FinishShuffleResponse
.newBuilder()
- .setStatus(valueOf(status))
+ .setStatus(status.toProto())
.setRetMsg(msg).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
@@ -378,7 +357,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
RequireBufferResponse response =
RequireBufferResponse
.newBuilder()
- .setStatus(valueOf(status))
+ .setStatus(status.toProto())
.setRequireBufferId(requireBufferId)
.build();
responseObserver.onNext(response);
@@ -393,7 +372,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
AppHeartBeatResponse response = AppHeartBeatResponse
.newBuilder()
.setRetMsg("")
- .setStatus(valueOf(StatusCode.SUCCESS))
+ .setStatus(StatusCode.SUCCESS.toProto())
.build();
if (Context.current().isCancelled()) {
@@ -429,7 +408,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
LOG.error("Error happened when report shuffle result for " +
requestInfo, e);
}
- reply =
ReportShuffleResultResponse.newBuilder().setStatus(valueOf(status)).setRetMsg(msg).build();
+ reply =
ReportShuffleResultResponse.newBuilder().setStatus(status.toProto()).setRetMsg(msg).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@@ -464,7 +443,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
}
reply = GetShuffleResultResponse.newBuilder()
- .setStatus(valueOf(status))
+ .setStatus(status.toProto())
.setRetMsg(msg)
.setSerializedBitmap(serializedBlockIdsBytes)
.build();
@@ -503,7 +482,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
}
reply = GetShuffleResultForMultiPartResponse.newBuilder()
- .setStatus(valueOf(status))
+ .setStatus(status.toProto())
.setRetMsg(msg)
.setSerializedBitmap(serializedBlockIdsBytes)
.build();
@@ -561,7 +540,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
LOG.info("Successfully getShuffleData cost {} ms for shuffle"
+ " data with {}", readTime, requestInfo);
reply = GetLocalShuffleDataResponse.newBuilder()
- .setStatus(valueOf(status))
+ .setStatus(status.toProto())
.setRetMsg(msg)
.setData(UnsafeByteOperations.unsafeWrap(sdr.getData()))
.build();
@@ -570,7 +549,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
msg = "Error happened when get shuffle data for " + requestInfo + ", "
+ e.getMessage();
LOG.error(msg, e);
reply = GetLocalShuffleDataResponse.newBuilder()
- .setStatus(valueOf(status))
+ .setStatus(status.toProto())
.setRetMsg(msg)
.build();
} finally {
@@ -581,7 +560,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
msg = "Can't require memory to get shuffle data";
LOG.error(msg + " for " + requestInfo);
reply = GetLocalShuffleDataResponse.newBuilder()
- .setStatus(valueOf(status))
+ .setStatus(status.toProto())
.setRetMsg(msg)
.build();
}
@@ -624,7 +603,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
ShuffleServerMetrics.counterTotalReadDataSize.inc(data.length);
ShuffleServerMetrics.counterTotalReadLocalIndexFileSize.inc(data.length);
GetLocalShuffleIndexResponse.Builder builder =
GetLocalShuffleIndexResponse.newBuilder()
- .setStatus(valueOf(status))
+ .setStatus(status.toProto())
.setRetMsg(msg);
LOG.info("Successfully getShuffleIndex cost {} ms for {}"
+ " bytes with {}", readTime, data.length, requestInfo);
@@ -636,14 +615,14 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
LOG.warn("Index file for {} is not found, maybe the data has been
flushed to cold storage.",
requestInfo, indexFileNotFoundException);
reply = GetLocalShuffleIndexResponse.newBuilder()
- .setStatus(valueOf(status))
+ .setStatus(status.toProto())
.build();
} catch (Exception e) {
status = StatusCode.INTERNAL_ERROR;
msg = "Error happened when get shuffle index for " + requestInfo + ",
" + e.getMessage();
LOG.error(msg, e);
reply = GetLocalShuffleIndexResponse.newBuilder()
- .setStatus(valueOf(status))
+ .setStatus(status.toProto())
.setRetMsg(msg)
.build();
} finally {
@@ -654,7 +633,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
msg = "Can't require memory to get shuffle index";
LOG.error(msg + " for " + requestInfo);
reply = GetLocalShuffleIndexResponse.newBuilder()
- .setStatus(valueOf(status))
+ .setStatus(status.toProto())
.setRetMsg(msg)
.build();
}
@@ -721,7 +700,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
+ " data for {}", costTime, data.length, requestInfo);
reply = GetMemoryShuffleDataResponse.newBuilder()
- .setStatus(valueOf(status))
+ .setStatus(status.toProto())
.setRetMsg(msg)
.setData(UnsafeByteOperations.unsafeWrap(data))
.addAllShuffleDataBlockSegments(toShuffleDataBlockSegments(bufferSegments))
@@ -734,7 +713,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
reply = GetMemoryShuffleDataResponse.newBuilder()
.setData(UnsafeByteOperations.unsafeWrap(new byte[]{}))
.addAllShuffleDataBlockSegments(Lists.newArrayList())
- .setStatus(valueOf(status))
+ .setStatus(status.toProto())
.setRetMsg(msg)
.build();
} finally {
@@ -747,7 +726,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
reply = GetMemoryShuffleDataResponse.newBuilder()
.setData(UnsafeByteOperations.unsafeWrap(new byte[]{}))
.addAllShuffleDataBlockSegments(Lists.newArrayList())
- .setStatus(valueOf(status))
+ .setStatus(status.toProto())
.setRetMsg(msg)
.build();
}
diff --git a/server/src/main/java/org/apache/uniffle/server/StatusCode.java
b/server/src/main/java/org/apache/uniffle/server/StatusCode.java
index 64f16df9..98fe3973 100644
--- a/server/src/main/java/org/apache/uniffle/server/StatusCode.java
+++ b/server/src/main/java/org/apache/uniffle/server/StatusCode.java
@@ -17,6 +17,8 @@
package org.apache.uniffle.server;
+import org.apache.uniffle.proto.RssProtos;
+
public enum StatusCode {
SUCCESS(0),
DOUBLE_REGISTER(1),
@@ -25,7 +27,8 @@ public enum StatusCode {
NO_REGISTER(4),
NO_PARTITION(5),
INTERNAL_ERROR(6),
- TIMEOUT(7);
+ TIMEOUT(7),
+ ACCESS_DENIED(8);
private final int statusCode;
@@ -36,4 +39,9 @@ public enum StatusCode {
public int statusCode() {
return statusCode;
}
+
+ public RssProtos.StatusCode toProto() {
+ RssProtos.StatusCode code =
RssProtos.StatusCode.forNumber(this.statusCode());
+ return code == null ? RssProtos.StatusCode.INTERNAL_ERROR : code;
+ }
}
diff --git a/server/src/test/java/org/apache/uniffle/server/StatusCodeTest.java
b/server/src/test/java/org/apache/uniffle/server/StatusCodeTest.java
new file mode 100644
index 00000000..8975393f
--- /dev/null
+++ b/server/src/test/java/org/apache/uniffle/server/StatusCodeTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.proto.RssProtos;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class StatusCodeTest {
+
+ @Test
+ public void test() throws Exception {
+ RssProtos.StatusCode[] protoStatusCode = RssProtos.StatusCode.values();
+ for (RssProtos.StatusCode statusCode : protoStatusCode) {
+ try {
+ if (RssProtos.StatusCode.UNRECOGNIZED.equals(statusCode)) {
+ continue;
+ }
+ StatusCode.valueOf(statusCode.name());
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+ StatusCode[] statusCodes = StatusCode.values();
+ for (StatusCode statusCode : statusCodes) {
+ try {
+ RssProtos.StatusCode.valueOf(statusCode.name());
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+ for (int i = 0; i < statusCodes.length; i++) {
+ assertEquals(protoStatusCode[i], statusCodes[i].toProto());
+ }
+ }
+}