This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 9d2a98b70f2 [to dev/1.3] [RTO/RPO] Unify retry logic on
SessionConnection (#14927)
9d2a98b70f2 is described below
commit 9d2a98b70f238c4a4151eef237484374bff80592
Author: William Song <[email protected]>
AuthorDate: Mon Feb 24 10:19:08 2025 +0800
[to dev/1.3] [RTO/RPO] Unify retry logic on SessionConnection (#14927)
* cherry-pick merge
* add back load analyze exp
* add back load analyze exp
---
.../apache/iotdb/session/SessionConnection.java | 1195 ++++++--------------
1 file changed, 331 insertions(+), 864 deletions(-)
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 4a8c3a36ce5..425ae908e43 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -71,6 +71,7 @@ import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -262,23 +263,14 @@ public class SessionConnection {
protected void setTimeZone(String zoneId)
throws StatementExecutionException, IoTDBConnectionException {
- TSSetTimeZoneReq req = new TSSetTimeZoneReq(sessionId, zoneId);
- TSStatus resp;
- try {
- resp = client.setTimeZone(req);
- } catch (TException e) {
- if (reconnect()) {
- try {
- req.setSessionId(sessionId);
- resp = client.setTimeZone(req);
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
- RpcUtils.verifySuccess(resp);
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ TSSetTimeZoneReq req = new TSSetTimeZoneReq(sessionId,
zoneId);
+ return client.setTimeZone(req);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
setTimeZoneOfSession(zoneId);
}
@@ -295,93 +287,52 @@ public class SessionConnection {
protected void setStorageGroup(String storageGroup)
throws IoTDBConnectionException, StatementExecutionException {
- try {
- RpcUtils.verifySuccess(client.setStorageGroup(sessionId, storageGroup));
- } catch (TException e) {
- if (reconnect()) {
- try {
- RpcUtils.verifySuccess(client.setStorageGroup(sessionId,
storageGroup));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ final TSStatus status =
+ callWithReconnect(() -> client.setStorageGroup(sessionId,
storageGroup)).getResult();
+ RpcUtils.verifySuccess(status);
}
protected void deleteStorageGroups(List<String> storageGroups)
throws IoTDBConnectionException, StatementExecutionException {
- try {
- RpcUtils.verifySuccess(client.deleteStorageGroups(sessionId,
storageGroups));
- } catch (TException e) {
- if (reconnect()) {
- try {
- RpcUtils.verifySuccess(client.deleteStorageGroups(sessionId,
storageGroups));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ final TSStatus status =
+ callWithReconnect(() -> client.deleteStorageGroups(sessionId,
storageGroups)).getResult();
+ RpcUtils.verifySuccess(status);
}
protected void createTimeseries(TSCreateTimeseriesReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.createTimeseries(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.createTimeseries(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.createTimeseries(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected void createAlignedTimeseries(TSCreateAlignedTimeseriesReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.createAlignedTimeseries(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.createAlignedTimeseries(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.createAlignedTimeseries(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected void createMultiTimeseries(TSCreateMultiTimeseriesReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.createMultiTimeseries(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.createMultiTimeseries(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.createMultiTimeseries(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected boolean checkTimeseriesExists(String path, long timeout)
@@ -406,26 +357,22 @@ public class SessionConnection {
TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql,
statementId);
execReq.setFetchSize(session.fetchSize);
execReq.setTimeout(timeout);
- TSExecuteStatementResp execResp;
- try {
- execReq.setEnableRedirectQuery(enableRedirect);
- execResp = client.executeQueryStatementV2(execReq);
+ execReq.setEnableRedirectQuery(enableRedirect);
+
+ RetryResult<TSExecuteStatementResp> result =
+ callWithReconnect(
+ () -> {
+ execReq.setSessionId(sessionId);
+ execReq.setStatementId(statementId);
+ return client.executeQueryStatementV2(execReq);
+ });
+ TSExecuteStatementResp execResp = result.getResult();
+ if (result.getRetryAttempts() == 0) {
RpcUtils.verifySuccessWithRedirection(execResp.getStatus());
- } catch (TException e) {
- if (reconnect()) {
- try {
- execReq.setSessionId(sessionId);
- execReq.setStatementId(statementId);
- execResp = client.executeQueryStatementV2(execReq);
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ } else {
+ RpcUtils.verifySuccess(execResp.getStatus());
}
- RpcUtils.verifySuccess(execResp.getStatus());
return new SessionDataSet(
sql,
execResp.getColumns(),
@@ -445,49 +392,8 @@ public class SessionConnection {
protected void executeNonQueryStatement(String sql)
throws IoTDBConnectionException, StatementExecutionException {
-
TSExecuteStatementReq request = new TSExecuteStatementReq(sessionId, sql,
statementId);
-
- TException lastTException = null;
- TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
- if (i > 0) {
- // re-init the TException and TSStatus
- lastTException = null;
- status = null;
- // not first time, we need to sleep and then reconnect
- try {
- TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
- } catch (InterruptedException e) {
- // just ignore
- }
- if (!reconnect()) {
- // reconnect failed, just continue to make another retry.
- continue;
- }
- }
- try {
- status = executeNonQueryStatementInternal(request);
- // need retry
- if (status.isSetNeedRetry() && status.isNeedRetry()) {
- continue;
- }
- // succeed or don't need to retry
- RpcUtils.verifySuccess(status);
- return;
- } catch (TException e) {
- // all network exception need retry until reaching maxRetryCount
- lastTException = e;
- }
- }
-
- if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ callWithRetryAndVerify(() -> executeNonQueryStatementInternal(request));
}
private TSStatus executeNonQueryStatementInternal(TSExecuteStatementReq
request)
@@ -504,26 +410,23 @@ public class SessionConnection {
new TSRawDataQueryReq(sessionId, paths, startTime, endTime,
statementId);
execReq.setFetchSize(session.fetchSize);
execReq.setTimeout(timeOut);
- TSExecuteStatementResp execResp;
- try {
- execReq.setEnableRedirectQuery(enableRedirect);
- execResp = client.executeRawDataQueryV2(execReq);
+ execReq.setEnableRedirectQuery(enableRedirect);
+
+ RetryResult<TSExecuteStatementResp> result =
+ callWithReconnect(
+ () -> {
+ execReq.setSessionId(sessionId);
+ execReq.setStatementId(statementId);
+ return client.executeRawDataQueryV2(execReq);
+ });
+
+ TSExecuteStatementResp execResp = result.getResult();
+ if (result.getRetryAttempts() == 0) {
RpcUtils.verifySuccessWithRedirection(execResp.getStatus());
- } catch (TException e) {
- if (reconnect()) {
- try {
- execReq.setSessionId(sessionId);
- execReq.setStatementId(statementId);
- execResp = client.executeRawDataQueryV2(execReq);
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ } else {
+ RpcUtils.verifySuccess(execResp.getStatus());
}
- RpcUtils.verifySuccess(execResp.getStatus());
return new SessionDataSet(
"",
execResp.getColumns(),
@@ -548,28 +451,27 @@ public class SessionConnection {
req.setEnableRedirectQuery(enableRedirect);
req.setLegalPathNodes(isLegalPathNodes);
req.setTimeout(timeOut);
- TSExecuteStatementResp tsExecuteStatementResp = null;
TEndPoint redirectedEndPoint = null;
- try {
- tsExecuteStatementResp =
client.executeFastLastDataQueryForOneDeviceV2(req);
-
RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
- } catch (RedirectException e) {
- redirectedEndPoint = e.getEndPoint();
- } catch (TException e) {
- if (reconnect()) {
- try {
- req.setSessionId(sessionId);
- req.setStatementId(statementId);
- tsExecuteStatementResp =
client.executeFastLastDataQueryForOneDeviceV2(req);
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
+
+ RetryResult<TSExecuteStatementResp> result =
+ callWithReconnect(
+ () -> {
+ req.setSessionId(sessionId);
+ req.setStatementId(statementId);
+ return client.executeFastLastDataQueryForOneDeviceV2(req);
+ });
+
+ TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
+ if (result.getRetryAttempts() == 0) {
+ try {
+
RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
+ } catch (RedirectException e) {
+ redirectedEndPoint = e.getEndPoint();
}
+ } else {
+ RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
}
- RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
return new Pair<>(
new SessionDataSet(
"",
@@ -594,25 +496,22 @@ public class SessionConnection {
tsLastDataQueryReq.setFetchSize(session.fetchSize);
tsLastDataQueryReq.setEnableRedirectQuery(enableRedirect);
tsLastDataQueryReq.setTimeout(timeOut);
- TSExecuteStatementResp tsExecuteStatementResp;
- try {
- tsExecuteStatementResp =
client.executeLastDataQueryV2(tsLastDataQueryReq);
+
+ RetryResult<TSExecuteStatementResp> result =
+ callWithReconnect(
+ () -> {
+ tsLastDataQueryReq.setSessionId(sessionId);
+ tsLastDataQueryReq.setStatementId(statementId);
+ return client.executeLastDataQueryV2(tsLastDataQueryReq);
+ });
+ final TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
+
+ if (result.getRetryAttempts() == 0) {
RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
- } catch (TException e) {
- if (reconnect()) {
- try {
- tsLastDataQueryReq.setSessionId(sessionId);
- tsLastDataQueryReq.setStatementId(statementId);
- tsExecuteStatementResp =
client.executeLastDataQueryV2(tsLastDataQueryReq);
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ } else {
+ RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
}
- RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
return new SessionDataSet(
"",
tsExecuteStatementResp.getColumns(),
@@ -676,25 +575,21 @@ public class SessionConnection {
private SessionDataSet executeAggregationQuery(TSAggregationQueryReq
tsAggregationQueryReq)
throws StatementExecutionException, IoTDBConnectionException,
RedirectException {
- TSExecuteStatementResp tsExecuteStatementResp;
- try {
- tsExecuteStatementResp =
client.executeAggregationQueryV2(tsAggregationQueryReq);
+ RetryResult<TSExecuteStatementResp> result =
+ callWithReconnect(
+ () -> {
+ tsAggregationQueryReq.setSessionId(sessionId);
+ tsAggregationQueryReq.setStatementId(statementId);
+ return client.executeAggregationQueryV2(tsAggregationQueryReq);
+ });
+
+ TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
+ if (result.getRetryAttempts() == 0) {
RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
- } catch (TException e) {
- if (reconnect()) {
- try {
- tsAggregationQueryReq.setSessionId(sessionId);
- tsAggregationQueryReq.setStatementId(statementId);
- tsExecuteStatementResp =
client.executeAggregationQuery(tsAggregationQueryReq);
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ } else {
+ RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
}
- RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
return new SessionDataSet(
"",
tsExecuteStatementResp.getColumns(),
@@ -721,52 +616,7 @@ public class SessionConnection {
protected void insertRecord(TSInsertRecordReq request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
- TException lastTException = null;
- TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
- if (i > 0) {
- // re-init the TException and TSStatus
- lastTException = null;
- status = null;
- // not first time, we need to sleep and then reconnect
- try {
- TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
- } catch (InterruptedException e) {
- // just ignore
- }
- if (!reconnect()) {
- // reconnect failed, just continue to make another retry.
- continue;
- }
- }
- try {
- status = insertRecordInternal(request);
- // need retry
- if (status.isSetNeedRetry() && status.isNeedRetry()) {
- continue;
- }
- // succeed or don't need to retry
- if (i == 0) {
- // first time succeed, take account for redirection info
- RpcUtils.verifySuccessWithRedirection(status);
- } else {
- // if it's retry, just ignore redirection info
- RpcUtils.verifySuccess(status);
- }
- return;
- } catch (TException e) {
- // all network exception need retry until reaching maxRetryCount
- lastTException = e;
- }
- }
-
- if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ callWithRetryAndVerifyWithRedirection(() -> insertRecordInternal(request));
}
private TSStatus insertRecordInternal(TSInsertRecordReq request) throws
TException {
@@ -776,52 +626,7 @@ public class SessionConnection {
protected void insertRecord(TSInsertStringRecordReq request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
- TException lastTException = null;
- TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
- if (i > 0) {
- // re-init the TException and TSStatus
- lastTException = null;
- status = null;
- // not first time, we need to sleep and then reconnect
- try {
- TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
- } catch (InterruptedException e) {
- // just ignore
- }
- if (!reconnect()) {
- // reconnect failed, just continue to make another retry.
- continue;
- }
- }
- try {
- status = insertRecordInternal(request);
- // need retry
- if (status.isSetNeedRetry() && status.isNeedRetry()) {
- continue;
- }
- // succeed or don't need to retry
- if (i == 0) {
- // first time succeed, take account for redirection info
- RpcUtils.verifySuccessWithRedirection(status);
- } else {
- // if it's retry, just ignore redirection info
- RpcUtils.verifySuccess(status);
- }
- return;
- } catch (TException e) {
- // all network exception need retry until reaching maxRetryCount
- lastTException = e;
- }
- }
-
- if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ callWithRetryAndVerifyWithRedirection(() -> insertRecordInternal(request));
}
private TSStatus insertRecordInternal(TSInsertStringRecordReq request)
throws TException {
@@ -831,52 +636,8 @@ public class SessionConnection {
protected void insertRecords(TSInsertRecordsReq request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
- TException lastTException = null;
- TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
- if (i > 0) {
- // re-init the TException and TSStatus
- lastTException = null;
- status = null;
- // not first time, we need to sleep and then reconnect
- try {
- TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
- } catch (InterruptedException e) {
- // just ignore
- }
- if (!reconnect()) {
- // reconnect failed, just continue to make another retry.
- continue;
- }
- }
- try {
- status = insertRecordsInternal(request);
- // need retry
- if (status.isSetNeedRetry() && status.isNeedRetry()) {
- continue;
- }
- // succeed or don't need to retry
- if (i == 0) {
- // first time succeed, take account for redirection info
- RpcUtils.verifySuccessWithRedirectionForMultiDevices(status,
request.getPrefixPaths());
- } else {
- // if it's retry, just ignore redirection info
- RpcUtils.verifySuccess(status);
- }
- return;
- } catch (TException e) {
- // all network exception need retry until reaching maxRetryCount
- lastTException = e;
- }
- }
-
- if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ callWithRetryAndVerifyWithRedirectionForMultipleDevices(
+ () -> insertRecordsInternal(request), request::getPrefixPaths);
}
private TSStatus insertRecordsInternal(TSInsertRecordsReq request) throws
TException {
@@ -886,53 +647,8 @@ public class SessionConnection {
protected void insertRecords(TSInsertStringRecordsReq request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
-
- TException lastTException = null;
- TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
- if (i > 0) {
- // re-init the TException and TSStatus
- lastTException = null;
- status = null;
- // not first time, we need to sleep and then reconnect
- try {
- TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
- } catch (InterruptedException e) {
- // just ignore
- }
- if (!reconnect()) {
- // reconnect failed, just continue to make another retry.
- continue;
- }
- }
- try {
- status = insertRecordsInternal(request);
- // need retry
- if (status.isSetNeedRetry() && status.isNeedRetry()) {
- continue;
- }
- // succeed or don't need to retry
- if (i == 0) {
- // first time succeed, take account for redirection info
- RpcUtils.verifySuccessWithRedirectionForMultiDevices(status,
request.getPrefixPaths());
- } else {
- // if it's retry, just ignore redirection info
- RpcUtils.verifySuccess(status);
- }
- return;
- } catch (TException e) {
- // all network exception need retry until reaching maxRetryCount
- lastTException = e;
- }
- }
-
- if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ callWithRetryAndVerifyWithRedirectionForMultipleDevices(
+ () -> insertRecordsInternal(request), request::getPrefixPaths);
}
private TSStatus insertRecordsInternal(TSInsertStringRecordsReq request)
throws TException {
@@ -942,53 +658,7 @@ public class SessionConnection {
protected void insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq
request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
-
- TException lastTException = null;
- TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
- if (i > 0) {
- // re-init the TException and TSStatus
- lastTException = null;
- status = null;
- // not first time, we need to sleep and then reconnect
- try {
- TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
- } catch (InterruptedException e) {
- // just ignore
- }
- if (!reconnect()) {
- // reconnect failed, just continue to make another retry.
- continue;
- }
- }
- try {
- status = insertRecordsOfOneDeviceInternal(request);
- // need retry
- if (status.isSetNeedRetry() && status.isNeedRetry()) {
- continue;
- }
- // succeed or don't need to retry
- if (i == 0) {
- // first time succeed, take account for redirection info
- RpcUtils.verifySuccessWithRedirection(status);
- } else {
- // if it's retry, just ignore redirection info
- RpcUtils.verifySuccess(status);
- }
- return;
- } catch (TException e) {
- // all network exception need retry until reaching maxRetryCount
- lastTException = e;
- }
- }
-
- if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ callWithRetryAndVerifyWithRedirection(() ->
insertRecordsOfOneDeviceInternal(request));
}
private TSStatus
insertRecordsOfOneDeviceInternal(TSInsertRecordsOfOneDeviceReq request)
@@ -999,53 +669,7 @@ public class SessionConnection {
protected void
insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
-
- TException lastTException = null;
- TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
- if (i > 0) {
- // re-init the TException and TSStatus
- lastTException = null;
- status = null;
- // not first time, we need to sleep and then reconnect
- try {
- TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
- } catch (InterruptedException e) {
- // just ignore
- }
- if (!reconnect()) {
- // reconnect failed, just continue to make another retry.
- continue;
- }
- }
- try {
- status = insertStringRecordsOfOneDeviceInternal(request);
- // need retry
- if (status.isSetNeedRetry() && status.isNeedRetry()) {
- continue;
- }
- // succeed or don't need to retry
- if (i == 0) {
- // first time succeed, take account for redirection info
- RpcUtils.verifySuccessWithRedirection(status);
- } else {
- // if it's retry, just ignore redirection info
- RpcUtils.verifySuccess(status);
- }
- return;
- } catch (TException e) {
- // all network exception need retry until reaching maxRetryCount
- lastTException = e;
- }
- }
-
- if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ callWithRetryAndVerifyWithRedirection(() ->
insertStringRecordsOfOneDeviceInternal(request));
}
private TSStatus insertStringRecordsOfOneDeviceInternal(
@@ -1054,57 +678,48 @@ public class SessionConnection {
return client.insertStringRecordsOfOneDevice(request);
}
- protected void insertTablet(TSInsertTabletReq request)
- throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
+ private void callWithRetryAndVerifyWithRedirectionForMultipleDevices(
+ TFunction<TSStatus> function, Supplier<List<String>> pathSupplier)
+ throws StatementExecutionException, RedirectException,
IoTDBConnectionException {
+ RetryResult<TSStatus> result = callWithRetry(function);
- TException lastTException = null;
- TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
- if (i > 0) {
- // re-init the TException and TSStatus
- lastTException = null;
- status = null;
- // not first time, we need to sleep and then reconnect
- try {
- TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
- } catch (InterruptedException e) {
- // just ignore
- }
- if (!reconnect()) {
- // reconnect failed, just continue to make another retry.
- continue;
- }
- }
- try {
- status = insertTabletInternal(request);
- // need retry
- if (status.isSetNeedRetry() && status.isNeedRetry()) {
- continue;
- }
- // succeed or don't need to retry
- if (i == 0) {
- // first time succeed, take account for redirection info
- RpcUtils.verifySuccessWithRedirection(status);
- } else {
- // if it's retry, just ignore redirection info
- RpcUtils.verifySuccess(status);
- }
- return;
- } catch (TException e) {
- // all network exception need retry until reaching maxRetryCount
- lastTException = e;
+ TSStatus status = result.getResult();
+ if (status != null) {
+ if (result.getRetryAttempts() == 0) {
+ RpcUtils.verifySuccessWithRedirectionForMultiDevices(status,
pathSupplier.get());
+ } else {
+ RpcUtils.verifySuccess(status);
}
+ } else if (result.getException() != null) {
+ throw new IoTDBConnectionException(result.getException());
+ } else {
+ throw new IoTDBConnectionException(logForReconnectionFailure());
}
+ }
+ private void callWithRetryAndVerifyWithRedirection(TFunction<TSStatus>
function)
+ throws StatementExecutionException, RedirectException,
IoTDBConnectionException {
+ RetryResult<TSStatus> result = callWithRetry(function);
+
+ TSStatus status = result.getResult();
if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
+ if (result.getRetryAttempts() == 0) {
+ RpcUtils.verifySuccessWithRedirection(status);
+ } else {
+ RpcUtils.verifySuccess(status);
+ }
+ } else if (result.getException() != null) {
+ throw new IoTDBConnectionException(result.getException());
} else {
throw new IoTDBConnectionException(logForReconnectionFailure());
}
}
+ protected void insertTablet(TSInsertTabletReq request)
+ throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
+ callWithRetryAndVerifyWithRedirection(() -> insertTabletInternal(request));
+ }
+
private TSStatus insertTabletInternal(TSInsertTabletReq request) throws
TException {
request.setSessionId(sessionId);
return client.insertTablet(request);
@@ -1112,53 +727,8 @@ public class SessionConnection {
protected void insertTablets(TSInsertTabletsReq request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
-
- TException lastTException = null;
- TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
- if (i > 0) {
- // re-init the TException and TSStatus
- lastTException = null;
- status = null;
- // not first time, we need to sleep and then reconnect
- try {
- TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
- } catch (InterruptedException e) {
- // just ignore
- }
- if (!reconnect()) {
- // reconnect failed, just continue to make another retry.
- continue;
- }
- }
- try {
- status = insertTabletsInternal(request);
- // need retry
- if (status.isSetNeedRetry() && status.isNeedRetry()) {
- continue;
- }
- // succeed or don't need to retry
- if (i == 0) {
- // first time succeed, take account for redirection info
- RpcUtils.verifySuccessWithRedirectionForMultiDevices(status,
request.getPrefixPaths());
- } else {
- // if it's retry, just ignore redirection info
- RpcUtils.verifySuccess(status);
- }
- return;
- } catch (TException e) {
- // all network exception need retry until reaching maxRetryCount
- lastTException = e;
- }
- }
-
- if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ callWithRetryAndVerifyWithRedirectionForMultipleDevices(
+ () -> insertTabletsInternal(request), request::getPrefixPaths);
}
private TSStatus insertTabletsInternal(TSInsertTabletsReq request) throws
TException {
@@ -1168,55 +738,31 @@ public class SessionConnection {
protected void deleteTimeseries(List<String> paths)
throws IoTDBConnectionException, StatementExecutionException {
+ callWithRetryAndVerify(() -> client.deleteTimeseries(sessionId, paths));
+ }
- TException lastTException = null;
- TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
- if (i > 0) {
- // re-init the TException and TSStatus
- lastTException = null;
- status = null;
- // not first time, we need to sleep and then reconnect
- try {
- TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
- } catch (InterruptedException e) {
- // just ignore
- }
- if (!reconnect()) {
- // reconnect failed, just continue to make another retry.
- continue;
- }
- }
- try {
- status = client.deleteTimeseries(sessionId, paths);
- // need retry
- if (status.isSetNeedRetry() && status.isNeedRetry()) {
- continue;
- }
- // succeed or don't need to retry
- RpcUtils.verifySuccess(status);
- return;
- } catch (TException e) {
- // all network exception need retry until reaching maxRetryCount
- lastTException = e;
- }
- }
+ public void deleteData(TSDeleteDataReq request)
+ throws IoTDBConnectionException, StatementExecutionException {
+ callWithRetryAndVerify(() -> deleteDataInternal(request));
+ }
- if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
+ private void callWithRetryAndVerify(TFunction<TSStatus> rpc)
+ throws IoTDBConnectionException, StatementExecutionException {
+ RetryResult<TSStatus> result = callWithRetry(rpc);
+ if (result.getResult() != null) {
+ RpcUtils.verifySuccess(result.getResult());
+ } else if (result.getException() != null) {
+ throw new IoTDBConnectionException(result.getException());
} else {
throw new IoTDBConnectionException(logForReconnectionFailure());
}
}
- public void deleteData(TSDeleteDataReq request)
- throws IoTDBConnectionException, StatementExecutionException {
-
+ private RetryResult<TSStatus> callWithRetry(TFunction<TSStatus> rpc) {
TException lastTException = null;
TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
+ int i;
+ for (i = 0; i <= maxRetryCount; i++) {
if (i > 0) {
// re-init the TException and TSStatus
lastTException = null;
@@ -1225,7 +771,13 @@ public class SessionConnection {
try {
TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
} catch (InterruptedException e) {
- // just ignore
+ Thread.currentThread().interrupt();
+ logger.warn(
+ "Thread {} was interrupted during retry {} with wait time {} ms.
Exiting retry loop.",
+ Thread.currentThread().getName(),
+ i,
+ retryIntervalInMs);
+ break;
}
if (!reconnect()) {
// reconnect failed, just continue to make another retry.
@@ -1233,27 +785,19 @@ public class SessionConnection {
}
}
try {
- status = deleteDataInternal(request);
+ status = rpc.run();
// need retry
if (status.isSetNeedRetry() && status.isNeedRetry()) {
continue;
}
- // succeed or don't need to retry
- RpcUtils.verifySuccess(status);
- return;
+ break;
} catch (TException e) {
// all network exception need retry until reaching maxRetryCount
lastTException = e;
}
}
- if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ return new RetryResult<>(status, lastTException, i);
}
private TSStatus deleteDataInternal(TSDeleteDataReq request) throws
TException {
@@ -1263,116 +807,74 @@ public class SessionConnection {
protected void testInsertRecord(TSInsertStringRecordReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.testInsertStringRecord(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.testInsertStringRecord(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.testInsertStringRecord(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected void testInsertRecord(TSInsertRecordReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.testInsertRecord(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.testInsertRecord(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.testInsertRecord(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
public void testInsertRecords(TSInsertStringRecordsReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.testInsertStringRecords(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.testInsertStringRecords(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.testInsertStringRecords(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
public void testInsertRecords(TSInsertRecordsReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.testInsertRecords(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.testInsertRecords(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.testInsertRecords(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected void testInsertTablet(TSInsertTabletReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.testInsertTablet(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.testInsertTablet(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.testInsertTablet(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected void testInsertTablets(TSInsertTabletsReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.testInsertTablets(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.testInsertTablets(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.testInsertTablets(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
@SuppressWarnings({
@@ -1425,189 +927,121 @@ public class SessionConnection {
protected void createSchemaTemplate(TSCreateSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.createSchemaTemplate(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.createSchemaTemplate(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.createSchemaTemplate(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected void appendSchemaTemplate(TSAppendSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.appendSchemaTemplate(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.appendSchemaTemplate(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.appendSchemaTemplate(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected void pruneSchemaTemplate(TSPruneSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.pruneSchemaTemplate(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.pruneSchemaTemplate(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.pruneSchemaTemplate(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req)
throws StatementExecutionException, IoTDBConnectionException {
- TSQueryTemplateResp execResp;
- req.setSessionId(sessionId);
- try {
- execResp = client.querySchemaTemplate(req);
- RpcUtils.verifySuccess(execResp.getStatus());
- } catch (TException e) {
- if (reconnect()) {
- try {
- execResp = client.querySchemaTemplate(req);
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
-
+ final TSQueryTemplateResp execResp =
+ callWithReconnect(
+ () -> {
+ req.setSessionId(sessionId);
+ return client.querySchemaTemplate(req);
+ })
+ .getResult();
RpcUtils.verifySuccess(execResp.getStatus());
return execResp;
}
protected void setSchemaTemplate(TSSetSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.setSchemaTemplate(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.setSchemaTemplate(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.setSchemaTemplate(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected void unsetSchemaTemplate(TSUnsetSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.unsetSchemaTemplate(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.unsetSchemaTemplate(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.unsetSchemaTemplate(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected void dropSchemaTemplate(TSDropSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.dropSchemaTemplate(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.dropSchemaTemplate(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.dropSchemaTemplate(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected void createTimeseriesUsingSchemaTemplate(
TCreateTimeseriesUsingSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
-
RpcUtils.verifySuccess(client.createTimeseriesUsingSchemaTemplate(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
- request.setSessionId(sessionId);
-
RpcUtils.verifySuccess(client.createTimeseriesUsingSchemaTemplate(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
- }
- }
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.createTimeseriesUsingSchemaTemplate(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected TSBackupConfigurationResp getBackupConfiguration()
throws IoTDBConnectionException, StatementExecutionException {
- TSBackupConfigurationResp execResp;
- try {
- execResp = client.getBackupConfiguration();
- RpcUtils.verifySuccess(execResp.getStatus());
- } catch (TException e) {
- if (reconnect()) {
- try {
- execResp = client.getBackupConfiguration();
- RpcUtils.verifySuccess(execResp.getStatus());
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ final TSBackupConfigurationResp execResp =
+ callWithReconnect(() -> client.getBackupConfiguration()).getResult();
+ RpcUtils.verifySuccess(execResp.getStatus());
return execResp;
}
- public TSConnectionInfoResp fetchAllConnections() throws
IoTDBConnectionException {
+ private <T> RetryResult<T> callWithReconnect(TFunction<T> supplier)
+ throws IoTDBConnectionException {
+ T ret;
try {
- return client.fetchAllConnectionsInfo();
+ ret = supplier.run();
+ return new RetryResult<>(ret, null, 0);
} catch (TException e) {
if (reconnect()) {
try {
- return client.fetchAllConnectionsInfo();
+ ret = supplier.run();
+ return new RetryResult<>(ret, null, 1);
} catch (TException tException) {
throw new IoTDBConnectionException(tException);
}
@@ -1617,6 +1051,10 @@ public class SessionConnection {
}
}
+ public TSConnectionInfoResp fetchAllConnections() throws
IoTDBConnectionException {
+ return callWithReconnect(() ->
client.fetchAllConnectionsInfo()).getResult();
+ }
+
public boolean isEnableRedirect() {
return enableRedirect;
}
@@ -1652,4 +1090,33 @@ public class SessionConnection {
public String toString() {
return "SessionConnection{" + " endPoint=" + endPoint + "}";
}
+
+ private interface TFunction<T> {
+ T run() throws TException;
+ }
+
+ private static class RetryResult<T> {
+ private final T result;
+ private final TException exception;
+ private final int retryAttempts;
+
+ public RetryResult(T result, TException exception, int retryAttempts) {
+ Preconditions.checkArgument(result == null || exception == null);
+ this.result = result;
+ this.exception = exception;
+ this.retryAttempts = retryAttempts;
+ }
+
+ public int getRetryAttempts() {
+ return retryAttempts;
+ }
+
+ public TException getException() {
+ return exception;
+ }
+
+ public T getResult() {
+ return result;
+ }
+ }
}