This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d66bd13904 [INLONG-11683][SDK] Optimize the functions return of the
ProxyConfigManager class (#11684)
d66bd13904 is described below
commit d66bd139040a4e1c1f041e02c50885da169ec6cb
Author: Goson Zhang <[email protected]>
AuthorDate: Mon Jan 20 09:45:21 2025 +0800
[INLONG-11683][SDK] Optimize the functions return of the ProxyConfigManager
class (#11684)
* [INLONG-11683][SDK] Optimize the functions return of the
ProxyConfigManager class
* [INLONG-11683][SDK] Optimize the functions return of the
ProxyConfigManager class
---------
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/sdk/dataproxy/DefaultMessageSender.java | 18 +-
.../inlong/sdk/dataproxy/common/ErrorCode.java | 96 +++++
.../inlong/sdk/dataproxy/common/ProcessResult.java | 106 ++++++
.../inlong/sdk/dataproxy/common/SdkConsts.java | 2 -
.../sdk/dataproxy/config/ProxyConfigManager.java | 395 +++++++++++----------
.../inlong/sdk/dataproxy/network/ClientMgr.java | 21 +-
.../sdk/dataproxy/network/HttpProxySender.java | 24 +-
.../inlong/sdk/dataproxy/utils/ProxyUtils.java | 20 ++
.../sdk/dataproxy/ProxyConfigManagerTest.java | 7 +-
inlong-sdk/pom.xml | 2 +-
10 files changed, 467 insertions(+), 224 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index b887c399e5..290b856ebe 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sdk.dataproxy;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.util.MessageUtils;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
@@ -30,7 +31,6 @@ import org.apache.inlong.sdk.dataproxy.network.Sender;
import org.apache.inlong.sdk.dataproxy.network.SequentialID;
import org.apache.inlong.sdk.dataproxy.threads.IndexCollectThread;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
-import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,20 +102,20 @@ public class DefaultMessageSender implements
MessageSender {
ThreadFactory selfDefineFactory) throws Exception {
LOGGER.info("Initial tcp sender, configure is {}", tcpConfig);
// initial sender object
+ ProcessResult procResult = new ProcessResult();
ProxyConfigManager proxyConfigManager = new
ProxyConfigManager(tcpConfig);
- Tuple2<ProxyConfigEntry, String> result =
- proxyConfigManager.getGroupIdConfigure(true);
- if (result.getF0() == null) {
- throw new Exception(result.getF1());
+ if (!proxyConfigManager.getGroupIdConfigure(true, procResult)) {
+ throw new Exception(procResult.toString());
}
- DefaultMessageSender sender =
CACHE_SENDER.get(result.getF0().getClusterId());
+ ProxyConfigEntry configEntry = (ProxyConfigEntry)
procResult.getRetData();
+ DefaultMessageSender sender =
CACHE_SENDER.get(configEntry.getClusterId());
if (sender != null) {
return sender;
} else {
DefaultMessageSender tmpMessageSender =
new DefaultMessageSender(tcpConfig, selfDefineFactory);
-
tmpMessageSender.setMaxPacketLength(result.getF0().getMaxPacketLength());
- CACHE_SENDER.put(result.getF0().getClusterId(), tmpMessageSender);
+
tmpMessageSender.setMaxPacketLength(configEntry.getMaxPacketLength());
+ CACHE_SENDER.put(configEntry.getClusterId(), tmpMessageSender);
return tmpMessageSender;
}
}
@@ -209,7 +209,7 @@ public class DefaultMessageSender implements MessageSender {
}
public String getSDKVersion() {
- return SdkConsts.PROXY_SDK_VERSION;
+ return ProxyUtils.getJarVersion();
}
private SendResult attemptSendMessage(Function<Sender, SendResult>
sendOperation) {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
new file mode 100644
index 0000000000..5f4cd0cdc8
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.common;
+
+import org.apache.commons.lang3.math.NumberUtils;
+
+/**
+ * Error Code class
+ *
+ * Used to identify different types of errors
+ */
+public enum ErrorCode {
+
+ OK(0, "Ok"),
+
+ SDK_CLOSED(11, "SDK service closed"),
+ //
+ ILLEGAL_CALL_STATE(21, "Only allowed for meta query"),
+ CONFIGURE_NOT_INITIALIZED(22, "Configure not initialized"),
+ FREQUENT_RMT_FAILURE_VISIT(23, "Frequent manager failure visit"),
+
+ // file visit
+ LOCAL_FILE_NOT_EXIST(31, "Local file not exist"),
+ LOCAL_FILE_EXPIRED(32, "Local file expired"),
+ READ_LOCAL_FILE_FAILURE(33, "Read local file failure"),
+ BLANK_FILE_CONTENT(34, "Blank file content"),
+ PARSE_FILE_CONTENT_FAILURE(35, "Parse file content failure"),
+ //
+ PARSE_FILE_CONTENT_IS_NULL(36, "Parse file content is null"),
+
+ // remote visit
+ BUILD_HTTP_CLIENT_EXCEPTION(41, "Build http client exception"),
+ HTTP_VISIT_EXCEPTION(42, "Visit http server exception"),
+ RMT_RETURN_FAILURE(43, "Http server return failure"),
+ RMT_RETURN_BLANK_CONTENT(44, "Http server return blank content"),
+ PARSE_RMT_CONTENT_FAILURE(45, "Parse manager content failure"),
+ //
+ PARSE_RMT_CONTENT_IS_NULL(46, "Parse manager content is null"),
+ RMT_RETURN_ERROR(47, "Manager return error info"),
+ META_FIELD_DATA_IS_NULL(48, "Field data is null"),
+ META_NODE_LIST_IS_EMPTY(49, "Field nodeList is empty"),
+ NODE_LIST_RECORD_INVALID(50, "No valid nodeList records"),
+ //
+ PARSE_PROXY_META_EXCEPTION(51, "No valid nodeList records"),
+ PARSE_ENCRYPT_META_EXCEPTION(52, "Parse encrypt content failure"),
+ META_REQUIRED_FIELD_NOT_EXIST(53, "Required meta field not exist"),
+ META_FIELD_VALUE_ILLEGAL(54, "Meta field value illegal"),
+
+ //
+ UNKNOWN_ERROR(9999, "Unknown error");
+
+ public static ErrorCode valueOf(int value) {
+ for (ErrorCode errCode : ErrorCode.values()) {
+ if (errCode.getErrCode() == value) {
+ return errCode;
+ }
+ }
+ return UNKNOWN_ERROR;
+ }
+
+ public static String getErrMsg(String errCode) {
+ int codeVal = NumberUtils.toInt(errCode, Integer.MAX_VALUE);
+ return valueOf(codeVal).errMsg;
+ }
+
+ private final int errCode;
+ private final String errMsg;
+
+ ErrorCode(int errCode, String errMsg) {
+ this.errCode = errCode;
+ this.errMsg = errMsg;
+ }
+
+ public int getErrCode() {
+ return errCode;
+ }
+
+ public String getErrMsg() {
+ return errMsg;
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProcessResult.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProcessResult.java
new file mode 100644
index 0000000000..c3763e9c08
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProcessResult.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.common;
+
+/**
+ * Process Result class
+ *
+ * Used to return the result of task processing
+ */
+public class ProcessResult {
+
+ // error code
+ private int errCode = ErrorCode.UNKNOWN_ERROR.getErrCode();
+ // error message
+ private String errMsg = "";
+ // return data if success
+ private Object retData = null;
+
+ public ProcessResult() {
+ //
+ }
+
+ public ProcessResult(ErrorCode errCode) {
+ this.errCode = errCode.getErrCode();
+ }
+
+ public ProcessResult(ErrorCode errCode, String errMsg) {
+ this.errCode = errCode.getErrCode();
+ this.errMsg = errMsg;
+ }
+
+ public boolean setSuccess() {
+ this.errCode = ErrorCode.OK.getErrCode();
+ this.errMsg = "";
+ this.retData = null;
+ return isSuccess();
+ }
+
+ public boolean setSuccess(Object retData) {
+ this.errCode = ErrorCode.OK.getErrCode();
+ this.errMsg = "";
+ this.retData = retData;
+ return isSuccess();
+ }
+
+ public boolean setFailResult(ProcessResult other) {
+ this.errCode = other.getErrCode();
+ this.errMsg = other.getErrMsg();
+ this.retData = other.getRetData();
+ return isSuccess();
+ }
+
+ public boolean setFailResult(ErrorCode errCode) {
+ this.errCode = errCode.getErrCode();
+ this.errMsg = "";
+ this.retData = null;
+ return isSuccess();
+ }
+
+ public boolean setFailResult(ErrorCode errCode, String errMsg) {
+ this.errCode = errCode.getErrCode();
+ this.errMsg = errMsg;
+ this.retData = null;
+ return isSuccess();
+ }
+
+ public boolean isSuccess() {
+ return (this.errCode == ErrorCode.OK.getErrCode());
+ }
+
+ public int getErrCode() {
+ return errCode;
+ }
+
+ public String getErrMsg() {
+ return errMsg;
+ }
+
+ public Object getRetData() {
+ return retData;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("ProcessResult{");
+ sb.append("errCode=").append(errCode);
+ sb.append(", errMsg='").append(errMsg).append('\'');
+ sb.append('}');
+ return sb.toString();
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
index 8ae882e21b..1449a5f0d8 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
@@ -19,8 +19,6 @@ package org.apache.inlong.sdk.dataproxy.common;
public class SdkConsts {
- public static final String PROXY_SDK_VERSION = "1.2.11";
-
public static String PREFIX_HTTP = "http://";
public static String PREFIX_HTTPS = "https://";
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
index 36c8f3ec46..b587bfff0d 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
@@ -20,6 +20,8 @@ package org.apache.inlong.sdk.dataproxy.config;
import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeInfo;
import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
import org.apache.inlong.common.util.BasicAuth;
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
@@ -108,6 +110,7 @@ public class ProxyConfigManager extends Thread {
private String proxyConfigVisitUrl;
private String proxyQueryFailKey;
private String proxyConfigCacheFile;
+ private ProxyConfigEntry proxyConfigEntry = null;
private List<HostInfo> proxyInfoList = new ArrayList<>();
private int oldStat = 0;
private String localMd5;
@@ -141,18 +144,15 @@ public class ProxyConfigManager extends Thread {
* @param configure proxy client configure
* @return process result
*/
- public Tuple2<Boolean, String> updProxyClientConfig(ProxyClientConfig
configure) {
+ public boolean updProxyClientConfig(ProxyClientConfig configure,
ProcessResult procResult) {
if (this.shutDown.get()) {
- return new Tuple2<>(false, "SDK has shutdown!");
- }
- if (configure == null) {
- return new Tuple2<>(false, "ProxyClientConfig is null");
+ return procResult.setFailResult(ErrorCode.SDK_CLOSED);
}
if (this.clientManager != null) {
- return new Tuple2<>(false, "Not allowed for non meta-query case!");
+ return procResult.setFailResult(ErrorCode.ILLEGAL_CALL_STATE);
}
this.storeAndBuildMetaConfigure(configure);
- return new Tuple2<>(true, "OK");
+ return procResult.setSuccess();
}
public void shutDown() {
@@ -167,123 +167,95 @@ public class ProxyConfigManager extends Thread {
}
/**
- * get groupId config
+ * query groupId configure from remote manager
*
* @return proxyConfigEntry
- * @throws Exception ex
*/
- public Tuple2<ProxyConfigEntry, String> getGroupIdConfigure(boolean
needRetry) throws Exception {
+ public boolean getGroupIdConfigure(boolean needRetry, ProcessResult
procResult) {
if (shutDown.get()) {
- return new Tuple2<>(null, "SDK has shutdown!");
+ return procResult.setFailResult(ErrorCode.SDK_CLOSED);
}
if (mgrConfig == null) {
- return new Tuple2<>(null, "Configure not initialized!");
+ return
procResult.setFailResult(ErrorCode.CONFIGURE_NOT_INITIALIZED);
}
if (mgrConfig.isOnlyUseLocalProxyConfig()) {
- return getLocalProxyListFromFile(this.localProxyConfigStoreFile);
+ return getLocalProxyListFromFile(this.localProxyConfigStoreFile,
procResult);
} else {
boolean readFromRmt = false;
- Tuple2<ProxyConfigEntry, String> result;
- result = tryToReadCacheProxyEntry();
- if (result.getF0() == null) {
+ if (!tryToReadCacheProxyEntry(procResult)) {
int retryCount = 0;
- do {
- result = requestProxyEntryQuietly();
- if (result.getF0() != null || !needRetry ||
shutDown.get()) {
- if (result.getF0() != null) {
- readFromRmt = true;
- }
+ while (!shutDown.get()) {
+ if (requestProxyEntryQuietly(procResult)) {
+ readFromRmt = true;
+ break;
+ }
+ if (!needRetry
+ || ++retryCount >=
mgrConfig.getMetaQueryMaxRetryIfFail()
+ || shutDown.get()) {
break;
}
// sleep then retry
ProxyUtils.sleepSomeTime(mgrConfig.getMetaQueryWaitMsIfFail());
- } while (++retryCount <
mgrConfig.getMetaQueryMaxRetryIfFail());
+ }
}
if (shutDown.get()) {
- return new Tuple2<>(null, "SDK has shutdown!");
+ return procResult.setFailResult(ErrorCode.SDK_CLOSED);
}
- if (result.getF0() == null) {
- return new Tuple2<>(null, "Visit manager error:" +
result.getF1());
- } else if (readFromRmt) {
- tryToWriteCacheProxyEntry(result.getF0());
+ if (readFromRmt && procResult.isSuccess()) {
+ tryToWriteCacheProxyEntry((ProxyConfigEntry)
procResult.getRetData());
}
- return result;
+ return procResult.isSuccess();
}
}
/**
- * get encrypt config
+ * query encrypt configure from remote manager
*
* @return proxyConfigEntry
- * @throws Exception ex
*/
- public Tuple2<EncryptConfigEntry, String> getEncryptConfigure(boolean
needRetry) throws Exception {
- if (!mgrConfig.isEnableReportEncrypt()) {
- return new Tuple2<>(null, "Not need data encrypt!");
- }
+ public boolean getEncryptConfigure(boolean needRetry, ProcessResult
procResult) {
if (shutDown.get()) {
- return new Tuple2<>(null, "SDK has shutdown!");
+ return procResult.setFailResult(ErrorCode.SDK_CLOSED);
}
if (mgrConfig == null) {
- return new Tuple2<>(null, "Configure not initialized!");
- }
- EncryptConfigEntry encryptEntry = this.userEncryptConfigEntry;
- if (encryptEntry != null) {
- return new Tuple2<>(encryptEntry, "Ok");
+ return
procResult.setFailResult(ErrorCode.CONFIGURE_NOT_INITIALIZED);
}
boolean readFromRmt = false;
- Tuple2<EncryptConfigEntry, String> result = readCachedPubKeyEntry();
- if (result.getF0() == null) {
+ if (!readCachedPubKeyEntry(procResult)) {
int retryCount = 0;
- do {
- result = requestPubKeyFromManager();
- if (result.getF0() != null || !needRetry || shutDown.get()) {
- if (result.getF0() != null) {
- readFromRmt = true;
- }
+ while (!shutDown.get()) {
+ if (requestPubKeyFromManager(procResult)) {
+ readFromRmt = true;
+ break;
+ }
+ if (!needRetry
+ || ++retryCount >=
mgrConfig.getMetaQueryMaxRetryIfFail()
+ || shutDown.get()) {
break;
}
// sleep then retry
ProxyUtils.sleepSomeTime(mgrConfig.getMetaQueryWaitMsIfFail());
- } while (++retryCount < mgrConfig.getMetaQueryMaxRetryIfFail());
+ }
}
if (shutDown.get()) {
- return new Tuple2<>(null, "SDK has shutdown!");
+ return procResult.setFailResult(ErrorCode.SDK_CLOSED);
}
- if (result.getF0() == null) {
- return new Tuple2<>(null, "Visit manager error:" + result.getF1());
- } else if (readFromRmt) {
- updateEncryptConfigEntry(result.getF0());
- writeCachePubKeyEntryFile(result.getF0());
+ if (readFromRmt && procResult.isSuccess()) {
+ writeCachePubKeyEntryFile((EncryptConfigEntry)
procResult.getRetData());
}
- return result;
+ return procResult.isSuccess();
}
@Override
public void run() {
logger.info("ConfigManager({}) thread start, groupId={}",
this.callerId, mgrConfig.getInlongGroupId());
+ long curTime;
+ ProcessResult procResult = new ProcessResult();
while (!shutDown.get()) {
// update proxy nodes meta configures
- try {
- doProxyEntryQueryWork();
- } catch (Throwable ex) {
- if (exptCounter.shouldPrint()) {
- logger.warn("ConfigManager({}) refresh proxy configure
exception, groupId={}",
- this.callerId, mgrConfig.getInlongGroupId(), ex);
- }
- }
- // update encrypt configure
- if (mgrConfig.isEnableReportEncrypt()) {
- try {
- doEncryptConfigEntryQueryWork();
- } catch (Throwable ex) {
- if (exptCounter.shouldPrint()) {
- logger.warn("ConfigManager({}) refresh encrypt info
exception, groupId={}",
- this.callerId, mgrConfig.getInlongGroupId(),
ex);
- }
- }
- }
+ curTime = System.currentTimeMillis();
+ updateMetaInfoFromRemote(procResult);
if (shutDown.get()) {
break;
}
@@ -294,141 +266,179 @@ public class ProxyConfigManager extends Thread {
this.callerId, this.mgrConfig.getInlongGroupId());
}
+ private boolean updateMetaInfoFromRemote(ProcessResult procResult) {
+ // update proxy nodes meta configures
+ if (!doProxyEntryQueryWork(procResult)) {
+ return procResult.isSuccess();
+ }
+ if (mgrConfig.isEnableReportEncrypt()) {
+ if (!doEncryptConfigEntryQueryWork(procResult)) {
+ return procResult.isSuccess();
+ }
+ }
+ return procResult.setSuccess();
+ }
+
+ public ProxyConfigEntry getProxyConfigEntry() {
+ return this.proxyConfigEntry;
+ }
+
+ public EncryptConfigEntry getUserEncryptConfigEntry() {
+ return userEncryptConfigEntry;
+ }
/**
* request proxyHost list from manager, update ClientMgr.proxyHostList and
channels
*
* @throws Exception
*/
- public void doProxyEntryQueryWork() throws Exception {
- if (shutDown.get() || this.clientManager == null) {
- return;
- }
+ public boolean doProxyEntryQueryWork(ProcessResult procResult) {
/* Request the configuration from manager. */
if (localMd5 == null) {
localMd5 = calcHostInfoMd5(proxyInfoList);
}
- Tuple2<ProxyConfigEntry, String> result;
+ ProxyConfigEntry rmtProxyConfigEntry = null;
if (mgrConfig.isOnlyUseLocalProxyConfig()) {
- result = getLocalProxyListFromFile(this.localProxyConfigStoreFile);
+ if (!getLocalProxyListFromFile(this.localProxyConfigStoreFile,
procResult)) {
+ return false;
+ }
+ rmtProxyConfigEntry = (ProxyConfigEntry) procResult.getRetData();
} else {
int retryCnt = 0;
- do {
- result = requestProxyEntryQuietly();
- if (result.getF0() != null || shutDown.get()) {
+ while (!shutDown.get()) {
+ if (requestProxyEntryQuietly(procResult)) {
+ break;
+ }
+ if (++retryCnt >= this.mgrConfig.getMetaSyncMaxRetryIfFail()
|| shutDown.get()) {
break;
}
// sleep then retry.
ProxyUtils.sleepSomeTime(mgrConfig.getMetaSyncWaitMsIfFail());
- } while (++retryCnt < this.mgrConfig.getMetaSyncMaxRetryIfFail()
&& !shutDown.get());
+ }
if (shutDown.get()) {
- return;
+ return procResult.setFailResult(ErrorCode.SDK_CLOSED);
}
- if (result.getF0() != null) {
- tryToWriteCacheProxyEntry(result.getF0());
+ if (procResult.isSuccess()) {
+ rmtProxyConfigEntry = (ProxyConfigEntry)
procResult.getRetData();
+ tryToWriteCacheProxyEntry(rmtProxyConfigEntry);
}
/* We should exit if no local IP list and can't request it from
TDManager. */
- if (localMd5 == null && result.getF0() == null) {
+ if (localMd5 == null && rmtProxyConfigEntry == null) {
if (exptCounter.shouldPrint()) {
logger.warn("ConfigManager({}) connect manager({})
failure, get cached configure, groupId={}",
this.callerId, this.proxyConfigVisitUrl,
this.mgrConfig.getInlongGroupId());
}
- result = tryToReadCacheProxyEntry();
+ if (tryToReadCacheProxyEntry(procResult)) {
+ rmtProxyConfigEntry = (ProxyConfigEntry)
procResult.getRetData();
+ }
}
- if (localMd5 != null && result.getF0() == null && proxyInfoList !=
null) {
+ if (localMd5 != null && rmtProxyConfigEntry == null &&
proxyInfoList != null) {
if (exptCounter.shouldPrint()) {
logger.warn("ConfigManager({}) connect manager({})
failure, using the last configure, groupId={}",
this.callerId, this.proxyConfigVisitUrl,
this.mgrConfig.getInlongGroupId());
}
}
}
- if (localMd5 == null && result.getF0() == null && proxyInfoList ==
null) {
- if (mgrConfig.isOnlyUseLocalProxyConfig()) {
- throw new Exception("Read local proxy configure failure,
please check first!");
- } else {
- throw new Exception("Connect Manager failure, please check
first!");
+ if (localMd5 == null && rmtProxyConfigEntry == null && proxyInfoList
== null) {
+ if (exptCounter.shouldPrint()) {
+ if (mgrConfig.isOnlyUseLocalProxyConfig()) {
+ logger.warn("ConfigManager({}) continue fetch proxy meta
failure, localFile={}, groupId={}",
+ this.callerId, this.localProxyConfigStoreFile,
this.mgrConfig.getInlongGroupId());
+ } else {
+ logger.warn("ConfigManager({}) continue fetch proxy meta
failure, manager={}, groupId={}",
+ this.callerId, this.proxyConfigVisitUrl,
this.mgrConfig.getInlongGroupId());
+ }
}
+ return procResult.isSuccess();
}
- compareAndUpdateProxyList(result.getF0());
+ compareAndUpdateProxyList(rmtProxyConfigEntry);
+ return procResult.setSuccess();
}
- private void doEncryptConfigEntryQueryWork() throws Exception {
- if (shutDown.get() || this.clientManager == null) {
- return;
- }
+ public boolean doEncryptConfigEntryQueryWork(ProcessResult procResult) {
int retryCount = 0;
- Tuple2<EncryptConfigEntry, String> result;
- do {
- result = requestPubKeyFromManager();
- if (result.getF0() != null || shutDown.get()) {
+ while (!shutDown.get()) {
+ if (requestPubKeyFromManager(procResult)) {
+ break;
+ }
+ if (++retryCount >= this.mgrConfig.getMetaSyncMaxRetryIfFail() ||
shutDown.get()) {
break;
}
// sleep then retry
ProxyUtils.sleepSomeTime(mgrConfig.getMetaSyncWaitMsIfFail());
- } while (++retryCount < mgrConfig.getMetaSyncMaxRetryIfFail());
+ }
if (shutDown.get()) {
- return;
+ return procResult.setFailResult(ErrorCode.SDK_CLOSED);
}
- if (result.getF0() == null) {
- if (this.userEncryptConfigEntry != null) {
- logger.warn("ConfigManager({}) connect manager({}) failure,
using the last pubKey, userName={}",
- this.callerId, this.encryptConfigVisitUrl,
this.mgrConfig.getRptUserName());
- return;
+ if (!procResult.isSuccess()) {
+ if (exptCounter.shouldPrint()) {
+ if (this.userEncryptConfigEntry == null) {
+ logger.warn("ConfigManager({}) continue fetch encrypt meta
failure, manager={}, username={}",
+ this.callerId, this.encryptConfigVisitUrl,
mgrConfig.getRptUserName());
+ } else {
+ logger.warn("ConfigManager({}) fetch encrypt failure,
manager={}, use the last pubKey, username={}",
+ this.callerId, this.encryptConfigVisitUrl,
mgrConfig.getRptUserName());
+ }
}
- throw new Exception("Visit manager error:" + result.getF1());
+ return procResult.isSuccess();
}
- updateEncryptConfigEntry(result.getF0());
- writeCachePubKeyEntryFile(result.getF0());
+ EncryptConfigEntry rmtEncryptEntry =
+ (EncryptConfigEntry) procResult.getRetData();
+ updateEncryptConfigEntry(rmtEncryptEntry);
+ writeCachePubKeyEntryFile(rmtEncryptEntry);
+ return procResult.setSuccess();
}
- public Tuple2<ProxyConfigEntry, String> getLocalProxyListFromFile(String
filePath) {
+ public boolean getLocalProxyListFromFile(String filePath, ProcessResult
procResult) {
String strRet;
try {
byte[] fileBytes = Files.readAllBytes(Paths.get(filePath));
strRet = new String(fileBytes);
} catch (Throwable ex) {
- return new Tuple2<>(null, "Read local configure failure from "
- + filePath + ", reason is " + ex.getMessage());
+ return procResult.setFailResult(ErrorCode.READ_LOCAL_FILE_FAILURE,
+ "Read local configure failure from "
+ + filePath + ", reason is " + ex.getMessage());
}
if (StringUtils.isBlank(strRet)) {
- return new Tuple2<>(null, "Blank configure local file from " +
filePath);
+ return procResult.setFailResult(ErrorCode.BLANK_FILE_CONTENT,
+ "Blank configure local file from " + filePath);
}
- return getProxyConfigEntry(false, strRet);
+ return getProxyConfigEntry(false, strRet, procResult);
}
- private Tuple2<ProxyConfigEntry, String> requestProxyEntryQuietly() {
+ private boolean requestProxyEntryQuietly(ProcessResult procResult) {
// check cache failure
String qryResult = getManagerQryResultInFailStatus(true);
if (qryResult != null) {
- return new Tuple2<>(null, "Query fail(" + qryResult + ") just now,
please retry later!");
+ return
procResult.setFailResult(ErrorCode.FREQUENT_RMT_FAILURE_VISIT,
+ "Query fail(" + qryResult + ") just now, retry later!");
}
// request meta info from manager
List<BasicNameValuePair> params = buildProxyNodeQueryParams();
logger.debug("ConfigManager({}) request configure to manager({}),
param={}",
this.callerId, this.proxyConfigVisitUrl, params);
- Tuple2<Boolean, String> queryResult =
- requestConfiguration(true, this.proxyConfigVisitUrl, params);
- if (!queryResult.getF0()) {
- return new Tuple2<>(null, queryResult.getF1());
+
+ if (!requestConfiguration(true, this.proxyConfigVisitUrl, params,
procResult)) {
+ return false;
}
+ String content = (String) procResult.getRetData();
// parse result
logger.debug("ConfigManager({}) received configure, from manager({}),
groupId={}, result={}",
- callerId, proxyConfigVisitUrl, mgrConfig.getInlongGroupId(),
queryResult.getF1());
+ callerId, proxyConfigVisitUrl, mgrConfig.getInlongGroupId(),
content);
try {
- Tuple2<ProxyConfigEntry, String> parseResult =
- getProxyConfigEntry(true, queryResult.getF1());
- if (parseResult.getF0() == null) {
- bookManagerQryFailStatus(true, parseResult.getF1());
- } else {
+ if (getProxyConfigEntry(true, content, procResult)) {
rmvManagerQryFailStatus(true);
+ } else {
+ bookManagerQryFailStatus(true, procResult.getErrMsg());
}
- return parseResult;
+ return procResult.isSuccess();
} catch (Throwable ex) {
if (exptCounter.shouldPrint()) {
- logger.warn("ConfigManager({}) parse failure, from
manager({}), groupId={}, result={}",
- callerId, proxyConfigVisitUrl,
mgrConfig.getInlongGroupId(), queryResult.getF1(), ex);
+ logger.warn("ConfigManager({}) parse exception, from
manager({}), groupId={}, result={}",
+ callerId, proxyConfigVisitUrl,
mgrConfig.getInlongGroupId(), content, ex);
}
bookManagerQryFailStatus(true, ex.getMessage());
- return new Tuple2<>(null, ex.getMessage());
+ return procResult.setFailResult(
+ ErrorCode.PARSE_PROXY_META_EXCEPTION, ex.getMessage());
}
}
@@ -468,6 +478,7 @@ public class ProxyConfigManager extends Thread {
newProxyNodeList = new ArrayList<>(proxyInfoList.size());
newProxyNodeList.addAll(proxyInfoList);
} else {
+ this.proxyConfigEntry = proxyEntry;
newSwitchStat = proxyEntry.getSwitchStat();
newProxyNodeList = new ArrayList<>(proxyEntry.getSize());
for (Map.Entry<String, HostInfo> entry :
proxyEntry.getHostMap().entrySet()) {
@@ -516,7 +527,7 @@ public class ProxyConfigManager extends Thread {
*
* @return read result
*/
- private Tuple2<ProxyConfigEntry, String> tryToReadCacheProxyEntry() {
+ private boolean tryToReadCacheProxyEntry(ProcessResult procResult) {
fileRw.readLock().lock();
try {
File file = new File(this.proxyConfigCacheFile);
@@ -526,63 +537,65 @@ public class ProxyConfigManager extends Thread {
&& diffTime < mgrConfig.getMetaCacheExpiredMs()) {
JsonReader reader = new JsonReader(new
FileReader(this.proxyConfigCacheFile));
ProxyConfigEntry proxyConfigEntry = gson.fromJson(reader,
ProxyConfigEntry.class);
- return new Tuple2<>(proxyConfigEntry, "Ok");
+ return procResult.setSuccess(proxyConfigEntry);
}
- return new Tuple2<>(null, "cache configure expired!");
+ return procResult.setFailResult(ErrorCode.LOCAL_FILE_EXPIRED);
} else {
- return new Tuple2<>(null, "no cache configure!");
+ return
procResult.setFailResult(ErrorCode.LOCAL_FILE_NOT_EXIST);
}
} catch (Throwable ex) {
if (exptCounter.shouldPrint()) {
logger.warn("ConfigManager({}) read cache file({}) exception,
groupId={}",
this.callerId, this.proxyConfigCacheFile,
this.mgrConfig.getInlongGroupId(), ex);
}
- return new Tuple2<>(null, "read cache configure failure:" +
ex.getMessage());
+ return procResult.setFailResult(
+ ErrorCode.READ_LOCAL_FILE_FAILURE, "read cache configure
failure:" + ex.getMessage());
} finally {
fileRw.readLock().unlock();
}
}
- private Tuple2<EncryptConfigEntry, String> requestPubKeyFromManager() {
+ private boolean requestPubKeyFromManager(ProcessResult procResult) {
// check cache failure
String qryResult = getManagerQryResultInFailStatus(false);
if (qryResult != null) {
- return new Tuple2<>(null, "Query fail(" + qryResult + ") just now,
please retry later!");
+ procResult.setFailResult(ErrorCode.FREQUENT_RMT_FAILURE_VISIT,
+ "Query fail(" + qryResult + ") just now, retry later!");
}
// request meta info from manager
List<BasicNameValuePair> params = buildPubKeyQueryParams();
logger.debug("ConfigManager({}) request pubkey to manager({}),
param={}",
this.callerId, this.encryptConfigVisitUrl, params);
- Tuple2<Boolean, String> queryResult =
- requestConfiguration(false, this.encryptConfigVisitUrl,
params);
- if (!queryResult.getF0()) {
- return new Tuple2<>(null, queryResult.getF1());
+ if (!requestConfiguration(false, this.encryptConfigVisitUrl, params,
procResult)) {
+ return false;
}
+ String content = (String) procResult.getRetData();
logger.debug("ConfigManager({}) received pubkey from manager({}),
result={}",
- this.callerId, this.encryptConfigVisitUrl,
queryResult.getF1());
+ this.callerId, this.encryptConfigVisitUrl, content);
String errorMsg;
JsonObject pubKeyConf;
try {
- pubKeyConf =
JsonParser.parseString(queryResult.getF1()).getAsJsonObject();
+ pubKeyConf = JsonParser.parseString(content).getAsJsonObject();
} catch (Throwable ex) {
if (parseCounter.shouldPrint()) {
logger.warn("ConfigManager({}) parse failure, userName={},
config={}!",
- this.callerId, this.mgrConfig.getRptUserName(),
queryResult.getF1());
+ this.callerId, this.mgrConfig.getRptUserName(),
content);
}
errorMsg = "parse pubkey failure:" + ex.getMessage();
bookManagerQryFailStatus(false, errorMsg);
- return new Tuple2<>(null, errorMsg);
+ return procResult.setFailResult(
+ ErrorCode.PARSE_RMT_CONTENT_FAILURE, errorMsg);
}
if (pubKeyConf == null) {
errorMsg = "No public key information";
bookManagerQryFailStatus(false, errorMsg);
- return new Tuple2<>(null, errorMsg);
+ return
procResult.setFailResult(ErrorCode.PARSE_RMT_CONTENT_IS_NULL);
}
try {
if (!pubKeyConf.has("resultCode")) {
if (parseCounter.shouldPrint()) {
logger.warn("ConfigManager({}) config failure: resultCode
field not exist, userName={}, config={}!",
- this.callerId, this.mgrConfig.getRptUserName(),
queryResult.getF1());
+ this.callerId, this.mgrConfig.getRptUserName(),
content);
}
throw new Exception("resultCode field not exist");
}
@@ -590,14 +603,14 @@ public class ProxyConfigManager extends Thread {
if (resultCode != 0) {
if (parseCounter.shouldPrint()) {
logger.warn("ConfigManager({}) config failure: resultCode
!= 0, userName={}, config={}!",
- this.callerId, this.mgrConfig.getRptUserName(),
queryResult.getF1());
+ this.callerId, this.mgrConfig.getRptUserName(),
content);
}
throw new Exception("resultCode != 0!");
}
if (!pubKeyConf.has("resultData")) {
if (parseCounter.shouldPrint()) {
logger.warn("ConfigManager({}) config failure: resultData
field not exist, userName={}, config={}!",
- this.callerId, this.mgrConfig.getRptUserName(),
queryResult.getF1());
+ this.callerId, this.mgrConfig.getRptUserName(),
content);
}
throw new Exception("resultData field not exist");
}
@@ -607,7 +620,7 @@ public class ProxyConfigManager extends Thread {
if (StringUtils.isBlank(publicKey)) {
if (parseCounter.shouldPrint()) {
logger.warn("ConfigManager({}) config failure:
publicKey is blank, userName={}, config={}!",
- this.callerId,
this.mgrConfig.getRptUserName(), queryResult.getF1());
+ this.callerId,
this.mgrConfig.getRptUserName(), content);
}
throw new Exception("publicKey is blank!");
}
@@ -615,7 +628,7 @@ public class ProxyConfigManager extends Thread {
if (StringUtils.isBlank(username)) {
if (parseCounter.shouldPrint()) {
logger.warn("ConfigManager({}) config failure:
username is blank, userName={}, config={}!",
- this.callerId,
this.mgrConfig.getRptUserName(), queryResult.getF1());
+ this.callerId,
this.mgrConfig.getRptUserName(), content);
}
throw new Exception("username is blank!");
}
@@ -623,17 +636,17 @@ public class ProxyConfigManager extends Thread {
if (StringUtils.isBlank(versionStr)) {
if (parseCounter.shouldPrint()) {
logger.warn("ConfigManager({}) config failure: version
is blank, userName={}, config={}!",
- this.callerId,
this.mgrConfig.getRptUserName(), queryResult.getF1());
+ this.callerId,
this.mgrConfig.getRptUserName(), content);
}
throw new Exception("version is blank!");
}
rmvManagerQryFailStatus(false);
- return new Tuple2<>(new EncryptConfigEntry(username,
versionStr, publicKey), "Ok");
+ return procResult.setSuccess(new EncryptConfigEntry(username,
versionStr, publicKey));
}
throw new Exception("resultData value is null!");
} catch (Throwable ex) {
bookManagerQryFailStatus(false, ex.getMessage());
- return new Tuple2<>(null, ex.getMessage());
+ return
procResult.setFailResult(ErrorCode.PARSE_ENCRYPT_META_EXCEPTION,
ex.getMessage());
}
}
@@ -642,7 +655,7 @@ public class ProxyConfigManager extends Thread {
this.userEncryptConfigEntry = newEncryptEntry;
}
- private Tuple2<EncryptConfigEntry, String> readCachedPubKeyEntry() {
+ private boolean readCachedPubKeyEntry(ProcessResult procResult) {
ObjectInputStream is;
FileInputStream fis = null;
EncryptConfigEntry entry;
@@ -658,18 +671,19 @@ public class ProxyConfigManager extends Thread {
entry = (EncryptConfigEntry) is.readObject();
// is.close();
fis.close();
- return new Tuple2<>(entry, "Ok");
+ return procResult.setSuccess(entry);
}
- return new Tuple2<>(null, "cache PubKeyEntry expired!");
+ return procResult.setFailResult(ErrorCode.LOCAL_FILE_EXPIRED);
} else {
- return new Tuple2<>(null, "no PubKeyEntry file!");
+ return
procResult.setFailResult(ErrorCode.LOCAL_FILE_NOT_EXIST);
}
} catch (Throwable ex) {
if (exptCounter.shouldPrint()) {
logger.warn("ConfigManager({}) read({}) file exception,
userName={}",
callerId, encryptConfigCacheFile,
mgrConfig.getRptUserName(), ex);
}
- return new Tuple2<>(null, "read PubKeyEntry file failure:" +
ex.getMessage());
+ return procResult.setFailResult(
+ ErrorCode.READ_LOCAL_FILE_FAILURE, "read PubKeyEntry file
failure:" + ex.getMessage());
} finally {
if (fis != null) {
try {
@@ -717,8 +731,8 @@ public class ProxyConfigManager extends Thread {
}
/* Request new configurations from Manager. */
- private Tuple2<Boolean, String> requestConfiguration(
- boolean queryProxyInfo, String url, List<BasicNameValuePair>
params) {
+ private boolean requestConfiguration(
+ boolean queryProxyInfo, String url, List<BasicNameValuePair>
params, ProcessResult procResult) {
HttpParams myParams = new BasicHttpParams();
HttpConnectionParams.setConnectionTimeout(myParams,
mgrConfig.getMgrConnTimeoutMs());
HttpConnectionParams.setSoTimeout(myParams,
mgrConfig.getMgrSocketTimeoutMs());
@@ -735,39 +749,39 @@ public class ProxyConfigManager extends Thread {
logger.warn("ConfigManager({}) create Http(s) client failure,
url={}, params={}",
this.callerId, url, params, eHttp);
}
- return new Tuple2<>(false, eHttp.getMessage());
+ return procResult.setFailResult(
+ ErrorCode.BUILD_HTTP_CLIENT_EXCEPTION, eHttp.getMessage());
}
// post request and get response
HttpPost httpPost = null;
try {
+ String errMsg;
httpPost = new HttpPost(url);
this.addAuthorizationInfo(httpPost);
UrlEncodedFormEntity urlEncodedFormEntity =
new UrlEncodedFormEntity(params, StandardCharsets.UTF_8);
httpPost.setEntity(urlEncodedFormEntity);
HttpResponse response = httpClient.execute(httpPost);
- String errMsg;
+ String returnStr = EntityUtils.toString(response.getEntity());
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
- errMsg = response.getStatusLine().getStatusCode()
- + ":" + response.getStatusLine().getReasonPhrase();
+ errMsg = response.getStatusLine().getStatusCode() + ":" +
returnStr;
if (response.getStatusLine().getStatusCode() >= 500) {
bookManagerQryFailStatus(queryProxyInfo, errMsg);
}
- return new Tuple2<>(false, errMsg);
+ return procResult.setFailResult(ErrorCode.RMT_RETURN_FAILURE,
errMsg);
}
- String returnStr = EntityUtils.toString(response.getEntity());
if (StringUtils.isBlank(returnStr)) {
errMsg = "server return blank entity!";
bookManagerQryFailStatus(queryProxyInfo, errMsg);
- return new Tuple2<>(false, errMsg);
+ return
procResult.setFailResult(ErrorCode.RMT_RETURN_BLANK_CONTENT, errMsg);
}
- return new Tuple2<>(true, returnStr);
+ return procResult.setSuccess(returnStr);
} catch (Throwable ex) {
if (exptCounter.shouldPrint()) {
logger.warn("ConfigManager({}) connect manager({}) exception,
params={}",
this.callerId, url, params, ex);
}
- return new Tuple2<>(false, ex.getMessage());
+ return procResult.setFailResult(ErrorCode.HTTP_VISIT_EXCEPTION,
ex.getMessage());
} finally {
if (httpPost != null) {
httpPost.releaseConnection();
@@ -786,8 +800,8 @@ public class ProxyConfigManager extends Thread {
headers.add(new BasicHeader(paramItem.getName(),
paramItem.getValue()));
}
RequestConfig requestConfig = RequestConfig.custom()
- .setConnectTimeout(mgrConfig.getMgrConnTimeoutMs())
- .setSocketTimeout(mgrConfig.getMgrSocketTimeoutMs()).build();
+ .setSocketTimeout(mgrConfig.getMgrSocketTimeoutMs())
+ .setConnectTimeout(mgrConfig.getMgrConnTimeoutMs()).build();
SSLContext sslContext = SSLContexts.custom().build();
SSLConnectionSocketFactory sslSf = new
SSLConnectionSocketFactory(sslContext,
new String[]{mgrConfig.getTlsVersion()}, null,
@@ -894,7 +908,7 @@ public class ProxyConfigManager extends Thread {
return null;
}
- private Tuple2<ProxyConfigEntry, String> getProxyConfigEntry(boolean
fromManager, String strRet) {
+ protected boolean getProxyConfigEntry(boolean fromManager, String strRet,
ProcessResult procResult) {
DataProxyNodeResponse proxyNodeConfig;
if (fromManager) {
ProxyClusterConfig clusterConfig;
@@ -905,16 +919,18 @@ public class ProxyConfigManager extends Thread {
logger.warn("ConfigManager({}) parse exception,
groupId={}, config={}",
this.callerId, mgrConfig.getInlongGroupId(),
strRet, ex);
}
- return new Tuple2<>(null, "parse failure:" + ex.getMessage());
+ return procResult.setFailResult(
+ ErrorCode.PARSE_RMT_CONTENT_FAILURE, "parse failure:"
+ ex.getMessage());
}
if (clusterConfig == null) {
- return new Tuple2<>(null, "content parse result is null!");
+ return
procResult.setFailResult(ErrorCode.PARSE_RMT_CONTENT_IS_NULL);
}
if (!clusterConfig.isSuccess()) {
- return new Tuple2<>(null, clusterConfig.getErrMsg());
+ return procResult.setFailResult(
+ ErrorCode.RMT_RETURN_ERROR, clusterConfig.getErrMsg());
}
if (clusterConfig.getData() == null) {
- return new Tuple2<>(null, "return data content is null!");
+ return
procResult.setFailResult(ErrorCode.META_FIELD_DATA_IS_NULL);
}
proxyNodeConfig = clusterConfig.getData();
} else {
@@ -925,16 +941,17 @@ public class ProxyConfigManager extends Thread {
logger.warn("ConfigManager({}) parse local file exception,
groupId={}, config={}",
this.callerId, mgrConfig.getInlongGroupId(),
strRet, ex);
}
- return new Tuple2<>(null, "parse file failure:" +
ex.getMessage());
+ return procResult.setFailResult(
+ ErrorCode.PARSE_FILE_CONTENT_FAILURE, "parse failure:"
+ ex.getMessage());
}
if (proxyNodeConfig == null) {
- return new Tuple2<>(null, "file content parse result is
null!");
+ return
procResult.setFailResult(ErrorCode.PARSE_FILE_CONTENT_IS_NULL);
}
}
// parse nodeList
List<DataProxyNodeInfo> nodeList = proxyNodeConfig.getNodeList();
if (CollectionUtils.isEmpty(nodeList)) {
- return new Tuple2<>(null, "nodeList is empty!");
+ return procResult.setFailResult(ErrorCode.META_NODE_LIST_IS_EMPTY);
}
HostInfo tmpHostInfo;
Map<String, HostInfo> hostMap = new HashMap<>();
@@ -953,7 +970,7 @@ public class ProxyConfigManager extends Thread {
hostMap.put(tmpHostInfo.getReferenceName(), tmpHostInfo);
}
if (hostMap.isEmpty()) {
- return new Tuple2<>(null, "no valid nodeList records!");
+ return
procResult.setFailResult(ErrorCode.NODE_LIST_RECORD_INVALID);
}
// parse clusterId
int clusterId = -1;
@@ -985,6 +1002,6 @@ public class ProxyConfigManager extends Thread {
proxyEntry.setLoad(load);
proxyEntry.setMaxPacketLength(
proxyNodeConfig.getMaxPacketLength() != null ?
proxyNodeConfig.getMaxPacketLength() : -1);
- return new Tuple2<>(proxyEntry, "ok");
+ return procResult.setSuccess(proxyEntry);
}
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
index b9eec02799..6fb1caf59f 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.dataproxy.network;
import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.HostInfo;
@@ -104,8 +105,9 @@ public class ClientMgr {
if (!started.compareAndSet(false, true)) {
return;
}
+ ProcessResult procResult = new ProcessResult();
try {
- this.configManager.doProxyEntryQueryWork();
+ this.configManager.doProxyEntryQueryWork(procResult);
} catch (Throwable ex) {
if (exptCounter.shouldPrint()) {
logger.error("ClientMgr({}) query {} exception",
@@ -159,22 +161,23 @@ public class ClientMgr {
if (!this.started.get()) {
throw new Exception("SDK not start or has shutdown!");
}
- Tuple2<ProxyConfigEntry, String> result =
- configManager.getGroupIdConfigure(true);
- if (result.getF0() == null) {
- throw new Exception(result.getF1());
+ ProcessResult procResult = new ProcessResult();
+ if (!configManager.getGroupIdConfigure(true, procResult)) {
+ throw new Exception(procResult.toString());
}
- return result.getF0();
+ return (ProxyConfigEntry) procResult.getRetData();
}
public EncryptConfigEntry getEncryptConfigureInfo() {
if (!this.started.get()) {
return null;
}
- Tuple2<EncryptConfigEntry, String> result;
+ ProcessResult procResult = new ProcessResult();
try {
- result = configManager.getEncryptConfigure(false);
- return result.getF0();
+ if (configManager.getEncryptConfigure(false, procResult)) {
+ return (EncryptConfigEntry) procResult.getRetData();
+ }
+ return null;
} catch (Throwable ex) {
return null;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
index 9da6305b49..eec1326747 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sdk.dataproxy.network;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.HostInfo;
@@ -25,7 +26,6 @@ import
org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.http.InternalHttpSender;
import org.apache.inlong.sdk.dataproxy.utils.ConcurrentHashSet;
-import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,10 +92,12 @@ public class HttpProxySender extends Thread {
*
* @return proxy config entry.
*/
- private ProxyConfigEntry retryGettingProxyConfig() throws Exception {
- Tuple2<ProxyConfigEntry, String> result =
- proxyConfigManager.getGroupIdConfigure(true);
- return result.getF0();
+ private ProxyConfigEntry retryGettingProxyConfig() {
+ ProcessResult procResult = new ProcessResult();
+ if (proxyConfigManager.getGroupIdConfigure(true, procResult)) {
+ return (ProxyConfigEntry) procResult.getRetData();
+ }
+ return null;
}
/**
@@ -103,19 +105,19 @@ public class HttpProxySender extends Thread {
*/
@Override
public void run() {
+ ProcessResult procResult = new ProcessResult();
while (!bShutDown) {
try {
int rand = ThreadLocalRandom.current().nextInt(0, 600);
long randSleepTime = proxyClientConfig.getMgrMetaSyncInrMs() +
rand;
TimeUnit.MILLISECONDS.sleep(randSleepTime);
if (proxyConfigManager != null) {
- Tuple2<ProxyConfigEntry, String> result =
- proxyConfigManager.getGroupIdConfigure(false);
- if (result.getF0() == null) {
- throw new Exception(result.getF1());
+ if (!proxyConfigManager.getGroupIdConfigure(false,
procResult)) {
+ throw new Exception(procResult.toString());
}
- hostList.addAll(result.getF0().getHostMap().values());
- hostList.retainAll(result.getF0().getHostMap().values());
+ ProxyConfigEntry configEntry = (ProxyConfigEntry)
procResult.getRetData();
+ hostList.addAll(configEntry.getHostMap().values());
+ hostList.retainAll(configEntry.getHostMap().values());
} else {
logger.error("manager is null, please check it!");
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index c74aea445e..0367e6e06a 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -26,12 +26,14 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.InputStream;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
public class ProxyUtils {
@@ -43,9 +45,11 @@ public class ProxyUtils {
private static final Set<String> invalidAttr = new HashSet<>();
public static final Set<MsgType> SdkAllowedMsgType = new HashSet<>();
private static String localHost;
+ private static String sdkVersion;
static {
localHost = getLocalIp();
+ getJarVersion();
Collections.addAll(invalidAttr, "groupId", "streamId", "dt",
"msgUUID", "cp",
"cnt", "mt", "m", "sid", "t", "NodeIP", "messageId",
"_file_status_check", "_secretId",
"_signature", "_timeStamp", "_nonce", "_userName",
"_clientIP", "_encyVersion", "_encyAesKey",
@@ -72,6 +76,22 @@ public class ProxyUtils {
return ip;
}
+ public static String getJarVersion() {
+ if (sdkVersion != null) {
+ return sdkVersion;
+ }
+ Properties properties = new Properties();
+ try (InputStream is =
ProxyUtils.class.getResourceAsStream("/git.properties")) {
+ properties.load(is);
+ sdkVersion = properties.getProperty("git.build.version");
+ } catch (Throwable ex) {
+ if (exceptCounter.shouldPrint()) {
+ logger.error("DataProxy-SDK get version failure", ex);
+ }
+ }
+ return sdkVersion;
+ }
+
public static boolean sleepSomeTime(long sleepTimeMs) {
try {
Thread.sleep(sleepTimeMs);
diff --git
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
index c7e3f773e2..da34136449 100644
---
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
+++
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
@@ -17,10 +17,10 @@
package org.apache.inlong.sdk.dataproxy;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
-import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
import org.junit.Assert;
import org.junit.Test;
@@ -47,8 +47,9 @@ public class ProxyConfigManagerTest {
@Test
public void testProxyConfigParse() throws Exception {
- Tuple2<ProxyConfigEntry, String> result =
proxyConfigManager.getLocalProxyListFromFile(localFile);
- ProxyConfigEntry proxyEntry = result.getF0();
+ ProcessResult procResult = new ProcessResult();
+ proxyConfigManager.getLocalProxyListFromFile(localFile, procResult);
+ ProxyConfigEntry proxyEntry = (ProxyConfigEntry)
procResult.getRetData();
Assert.assertEquals(proxyEntry.isInterVisit(), false);
Assert.assertEquals(proxyEntry.getLoad(), 12);
Assert.assertEquals(proxyEntry.getClusterId(), 1);
diff --git a/inlong-sdk/pom.xml b/inlong-sdk/pom.xml
index 1ce9d45d3e..d646c2d1b4 100644
--- a/inlong-sdk/pom.xml
+++ b/inlong-sdk/pom.xml
@@ -31,8 +31,8 @@
<modules>
<module>sdk-common</module>
- <module>sort-sdk</module>
<module>dataproxy-sdk</module>
+ <module>sort-sdk</module>
<module>transform-sdk</module>
<module>dirty-data-sdk</module>
</modules>