This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/fix_ratis_npe in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7f6e4494c31cc8ddf0fe89f9bb2d73197921af50 Author: Jinrui.Zhang <[email protected]> AuthorDate: Wed Aug 10 14:44:13 2022 +0800 fix the NPE error when write failed using Ratis --- .../common/response/ConsensusWriteResponse.java | 17 +++++++++++++++++ .../plan/scheduler/FragmentInstanceDispatcherImpl.java | 11 ++++++++--- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/response/ConsensusWriteResponse.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/response/ConsensusWriteResponse.java index 5319588a78..fdb7626c1d 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/common/response/ConsensusWriteResponse.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/response/ConsensusWriteResponse.java @@ -21,6 +21,7 @@ package org.apache.iotdb.consensus.common.response; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.rpc.TSStatusCode; public class ConsensusWriteResponse extends ConsensusResponse { @@ -40,6 +41,22 @@ public class ConsensusWriteResponse extends ConsensusResponse { return "ConsensusWriteResponse{" + "status=" + status + "} " + super.toString(); } + public boolean isSuccessful() { + return status != null && status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode(); + } + + public String getErrorMessage() { + if (status != null && status.message != null && status.message.length() > 0) { + return status.message; + } + if (exception != null + && exception.getMessage() != null + && exception.getMessage().length() > 0) { + return exception.getMessage(); + } + return "unknown error message"; + } + public static ConsensusWriteResponse.Builder newBuilder() { return new ConsensusWriteResponse.Builder(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java index fa3e47eeb2..f4a756d522 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -275,9 +275,14 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { writeResponse = SchemaRegionConsensusImpl.getInstance().write(groupId, planNode); } - if (writeResponse.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - logger.error(writeResponse.getStatus().message); - throw new FragmentInstanceDispatchException(writeResponse.getStatus()); + if (!writeResponse.isSuccessful()) { + logger.error(writeResponse.getErrorMessage()); + TSStatus failureStatus = + writeResponse.getStatus() != null + ? writeResponse.getStatus() + : RpcUtils.getStatus( + TSStatusCode.EXECUTE_STATEMENT_ERROR, writeResponse.getErrorMessage()); + throw new FragmentInstanceDispatchException(failureStatus); } else if (hasFailedMeasurement) { throw new FragmentInstanceDispatchException( RpcUtils.getStatus(
