imay closed pull request #354: Avoid 'No more data to read' error when handling
stream load RPC
URL: https://github.com/apache/incubator-doris/pull/354
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/fe/src/main/java/org/apache/doris/common/UserException.java
b/fe/src/main/java/org/apache/doris/common/UserException.java
index 7f04a08f..7add338f 100644
--- a/fe/src/main/java/org/apache/doris/common/UserException.java
+++ b/fe/src/main/java/org/apache/doris/common/UserException.java
@@ -17,24 +17,26 @@
package org.apache.doris.common;
+import com.google.common.base.Strings;
+
/**
* Thrown for internal server errors.
*/
public class UserException extends Exception {
public UserException(String msg, Throwable cause) {
- super(msg, cause);
+ super(Strings.nullToEmpty(msg), cause);
}
public UserException(Throwable cause) {
super(cause);
}
- public UserException(String message, Throwable cause, boolean
enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
+ public UserException(String msg, Throwable cause, boolean
enableSuppression, boolean writableStackTrace) {
+ super(Strings.nullToEmpty(msg), cause, enableSuppression,
writableStackTrace);
}
public UserException(String msg) {
- super(msg);
+ super(Strings.nullToEmpty(msg));
}
}
diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 9c6d2668..72ca0f9c 100644
--- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -396,11 +396,11 @@ public TFeResult miniLoad(TMiniLoadRequest request)
throws TException {
} catch (UserException e) {
LOG.warn("add mini load error", e);
status.setStatus_code(TStatusCode.ANALYSIS_ERROR);
- status.setError_msgs(Lists.newArrayList(e.getMessage()));
+ status.addToError_msgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("unexpected exception when adding mini load", e);
status.setStatus_code(TStatusCode.ANALYSIS_ERROR);
- status.setError_msgs(Lists.newArrayList(e.getMessage()));
+ status.addToError_msgs(Strings.nullToEmpty(e.getMessage()));
} finally {
ConnectContext.remove();
}
@@ -465,7 +465,7 @@ public TFeResult
updateMiniEtlTaskStatus(TUpdateMiniEtlTaskStatusRequest request
String failMsg = "job does not exist. id: " + jobId;
LOG.warn(failMsg);
status.setStatus_code(TStatusCode.CANCELLED);
- status.setError_msgs(Lists.newArrayList(failMsg));
+ status.addToError_msgs(failMsg);
return result;
}
@@ -474,7 +474,7 @@ public TFeResult
updateMiniEtlTaskStatus(TUpdateMiniEtlTaskStatusRequest request
String failMsg = "task info does not exist. task id: " + taskId +
", job id: " + jobId;
LOG.warn(failMsg);
status.setStatus_code(TStatusCode.CANCELLED);
- status.setError_msgs(Lists.newArrayList(failMsg));
+ status.addToError_msgs(failMsg);
return result;
}
@@ -554,12 +554,12 @@ public TFeResult loadCheck(TLoadCheckRequest request)
throws TException {
request.getTbl(), request.getUser_ip(),
PrivPredicate.LOAD);
} catch (UserException e) {
status.setStatus_code(TStatusCode.ANALYSIS_ERROR);
- status.setError_msgs(Lists.newArrayList(e.getMessage()));
+ status.addToError_msgs(e.getMessage());
return result;
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatus_code(TStatusCode.INTERNAL_ERROR);
- status.setError_msgs(Lists.newArrayList(e.getMessage()));
+ status.addToError_msgs(Strings.nullToEmpty(e.getMessage()));
return result;
}
@@ -576,11 +576,17 @@ public TLoadTxnBeginResult
loadTxnBegin(TLoadTxnBeginRequest request) throws TEx
result.setTxnId(loadTxnBeginImpl(request));
} catch (LabelAlreadyExistsException e) {
status.setStatus_code(TStatusCode.LABEL_ALREADY_EXISTS);
- status.setError_msgs(Lists.newArrayList(e.getMessage()));
+ status.addToError_msgs(e.getMessage());
} catch (UserException e) {
status.setStatus_code(TStatusCode.ANALYSIS_ERROR);
- status.setError_msgs(Lists.newArrayList(e.getMessage()));
+ status.addToError_msgs(e.getMessage());
+ } catch (Throwable e) {
+ LOG.warn("catch unknown result.", e);
+ status.setStatus_code(TStatusCode.INTERNAL_ERROR);
+ status.addToError_msgs(Strings.nullToEmpty(e.getMessage()));
+ return result;
}
+
return result;
}
@@ -624,12 +630,16 @@ public TLoadTxnCommitResult
loadTxnCommit(TLoadTxnCommitRequest request) throws
if (!loadTxnCommitImpl(request)) {
// committed success but not visible
status.setStatus_code(TStatusCode.PUBLISH_TIMEOUT);
- status.setError_msgs(
- Lists.newArrayList("transaction commit successfully,
BUT data will be visible later"));
+ status.addToError_msgs("transaction commit successfully, BUT
data will be visible later");
}
} catch (UserException e) {
status.setStatus_code(TStatusCode.ANALYSIS_ERROR);
status.addToError_msgs(e.getMessage());
+ } catch (Throwable e) {
+ LOG.warn("catch unknown result.", e);
+ status.setStatus_code(TStatusCode.INTERNAL_ERROR);
+ status.addToError_msgs(Strings.nullToEmpty(e.getMessage()));
+ return result;
}
return result;
}
@@ -674,6 +684,11 @@ public TLoadTxnRollbackResult
loadTxnRollback(TLoadTxnRollbackRequest request) t
} catch (UserException e) {
status.setStatus_code(TStatusCode.ANALYSIS_ERROR);
status.addToError_msgs(e.getMessage());
+ } catch (Throwable e) {
+ LOG.warn("catch unknown result.", e);
+ status.setStatus_code(TStatusCode.INTERNAL_ERROR);
+ status.addToError_msgs(Strings.nullToEmpty(e.getMessage()));
+ return result;
}
return result;
@@ -704,6 +719,11 @@ public TStreamLoadPutResult
streamLoadPut(TStreamLoadPutRequest request) throws
} catch (UserException e) {
status.setStatus_code(TStatusCode.ANALYSIS_ERROR);
status.addToError_msgs(e.getMessage());
+ } catch (Throwable e) {
+ LOG.warn("catch unknown result.", e);
+ status.setStatus_code(TStatusCode.INTERNAL_ERROR);
+ status.addToError_msgs(Strings.nullToEmpty(e.getMessage()));
+ return result;
}
return result;
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]