This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 45b6e0d53f [INLONG-8766][SDK] SortSDK create consumer in parallel
(#8784)
45b6e0d53f is described below
commit 45b6e0d53f947b87569ae50f01b406e8fc39c0c8
Author: vernedeng <[email protected]>
AuthorDate: Tue Aug 29 16:15:02 2023 +0800
[INLONG-8766][SDK] SortSDK create consumer in parallel (#8784)
---
.../apache/inlong/sdk/sort/api/ClientContext.java | 10 +
.../inlong/sdk/sort/api/ConfigConstants.java | 6 +
.../sdk/sort/api/InlongTopicManagerFactory.java | 9 +
.../inlong/sdk/sort/api/SortClientConfig.java | 36 ++
.../inlong/sdk/sort/api/SortClientFactory.java | 9 +
.../apache/inlong/sdk/sort/api/TopicFetcher.java | 17 +
.../apache/inlong/sdk/sort/api/TopicManager.java | 21 +
.../sdk/sort/impl/QueryConsumeConfigImpl.java | 43 +-
.../inlong/sdk/sort/impl/SortClientImplV2.java | 153 +++++++
.../sdk/sort/manager/InlongMultiTopicManager.java | 3 +
.../sdk/sort/manager/InlongSingleTopicManager.java | 2 +
.../sdk/sort/manager/InlongTopicManager.java | 489 +++++++++++++++++++++
.../inlong/sdk/sort/metrics/SortSdkMetricItem.java | 13 +-
.../metrics/SortSdkPrometheusMetricListener.java | 15 +-
14 files changed, 806 insertions(+), 20 deletions(-)
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
index 4420fcfc72..fefb4100f9 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
@@ -142,6 +142,16 @@ public abstract class ClientContext implements Cleanable {
metricItem.requestManagerParamErrorCount.incrementAndGet();
}
+ public void addRequestManagerEmptyError() {
+ SortSdkMetricItem metricItem = this.getMetricItem(null, -1);
+ metricItem.reqeustManagerEmptyCount.incrementAndGet();
+ }
+
+ public void addRequestManagerTopicsChangeOutOfThreshold() {
+ SortSdkMetricItem metricItem = this.getMetricItem(null, -1);
+ metricItem.requestManagerTopicsChangeOutOfThreshold.incrementAndGet();
+ }
+
private SortSdkMetricItem getMetricItem(InLongTopic topic, int
partitionId) {
Map<String, String> dimensions = new HashMap<>();
dimensions.put(SortSdkMetricItem.KEY_SORT_TASK_ID, sortTaskId);
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java
index 32136d470b..52be3a1cb4 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java
@@ -54,4 +54,10 @@ public class ConfigConstants {
public static final String IS_TOPIC_STATICS_ENABLED =
"isTopicStaticsEnabled";
public static final String IS_PARTITION_STATICS_ENABLED =
"isPartitionStaticsEnabled";
+ public static final String MAX_OFFLINE_TOPIC = "maxOfflineTopic";
+
+ public static final String START_OFFLINE_CHECK_THRESHOLD =
"startOfflineCheckThreshold";
+
+ public static final String THREAD_POOL_SIZE = "threadPoolSize";
+
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java
index 946e926e5c..eb6e9710af 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sdk.sort.api;
import org.apache.inlong.sdk.sort.api.SortClientConfig.TopicType;
import org.apache.inlong.sdk.sort.manager.InlongMultiTopicManager;
import org.apache.inlong.sdk.sort.manager.InlongSingleTopicManager;
+import org.apache.inlong.sdk.sort.manager.InlongTopicManager;
/**
* Inlong topic manager factory.
@@ -27,6 +28,14 @@ import
org.apache.inlong.sdk.sort.manager.InlongSingleTopicManager;
*/
public class InlongTopicManagerFactory {
+ /**
+ * Since 1.9.0
+ */
+ public static TopicManager createInlongTopicManagerV2(
+ ClientContext context,
+ QueryConsumeConfig queryConsumeConfig) {
+ return new InlongTopicManager(context, queryConsumeConfig);
+ }
public static TopicManager createInLongTopicManager(
TopicType type,
ClientContext context,
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
index 895038490f..b9776265b4 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
@@ -72,6 +72,11 @@ public class SortClientConfig implements Serializable {
private boolean topicStaticsEnabled = true;
private boolean partitionStaticsEnabled = true;
+ private int startOfflineTopicCheckThreshold = 50;
+ private int maxOfflineTopicPercent = 5;
+
+ private int threadPoolSize = 50;
+
public SortClientConfig(
String sortTaskId,
String sortClusterName,
@@ -379,6 +384,30 @@ public class SortClientConfig implements Serializable {
return partitionStaticsEnabled;
}
+ public int getMaxOfflineTopicPercent() {
+ return maxOfflineTopicPercent;
+ }
+
+ public void setMaxOfflineTopicPercent(int maxOfflineTopicPercent) {
+ this.maxOfflineTopicPercent = maxOfflineTopicPercent;
+ }
+
+ public int getStartOfflineTopicCheckThreshold() {
+ return startOfflineTopicCheckThreshold;
+ }
+
+ public void setStartOfflineTopicCheckThreshold(int
startOfflineTopicCheckThreshold) {
+ this.startOfflineTopicCheckThreshold = startOfflineTopicCheckThreshold;
+ }
+
+ public int getThreadPoolSize() {
+ return threadPoolSize;
+ }
+
+ public void setThreadPoolSize(int threadPoolSize) {
+ this.threadPoolSize = threadPoolSize;
+ }
+
/**
* ConsumeStrategy
*/
@@ -469,6 +498,13 @@ public class SortClientConfig implements Serializable {
Boolean.TRUE.toString());
this.partitionStaticsEnabled =
StringUtils.equalsIgnoreCase(strPartitionStaticsEnabled,
Boolean.TRUE.toString());
+
+ this.maxOfflineTopicPercent =
+
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.MAX_OFFLINE_TOPIC),
maxOfflineTopicPercent);
+ this.startOfflineTopicCheckThreshold =
+
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.START_OFFLINE_CHECK_THRESHOLD),
+ startOfflineTopicCheckThreshold);
+ this.threadPoolSize =
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.THREAD_POOL_SIZE),
threadPoolSize);
}
public List<InLongTopic> getConsumerSubset(List<InLongTopic> totalTopics) {
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientFactory.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientFactory.java
index ba18f621e7..86a119751b 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientFactory.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientFactory.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.sort.api;
import org.apache.inlong.sdk.sort.impl.SortClientImpl;
+import org.apache.inlong.sdk.sort.impl.SortClientImplV2;
/**
* Factory of {@link SortClient}
@@ -36,4 +37,12 @@ public class SortClientFactory {
public static SortClient createSortClient(SortClientConfig config,
QueryConsumeConfig queryConsumeConfig) {
return new SortClientImpl(config, queryConsumeConfig);
}
+
+ public static SortClient createSortClientV2(SortClientConfig config) {
+ return new SortClientImplV2(config);
+ }
+
+ public static SortClient createSortClientV2(SortClientConfig config,
QueryConsumeConfig queryConsumeConfig) {
+ return new SortClientImplV2(config, queryConsumeConfig);
+ }
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java
index 536ed3ac45..682e678de0 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sdk.sort.api;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
/**
* Interface of all type of topic fetchers.
@@ -32,6 +33,14 @@ public interface TopicFetcher {
*/
boolean init();
+ /**
+ * Init topic fetcher in async ways
+ * @return CompletableFuture of init results
+ */
+ default CompletableFuture<Boolean> initAsync() {
+ return CompletableFuture.supplyAsync(this::init);
+ }
+
/**
* Ack message by the given msgOffset.
* @param msgOffset Offset of message.
@@ -61,6 +70,14 @@ public interface TopicFetcher {
*/
boolean close();
+ /**
+ * Close topic fetcher in async ways
+ * @return
+ */
+ default CompletableFuture<Boolean> closeAsync() {
+ return CompletableFuture.supplyAsync(this::close);
+ }
+
/**
* Get if the fetcher is closed or not.
* @return Closed or not.
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicManager.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicManager.java
index 0759c925f6..8f76142862 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicManager.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicManager.java
@@ -50,6 +50,15 @@ public abstract class TopicManager implements Cleanable {
*/
public abstract TopicFetcher removeTopic(InLongTopic topic, boolean
closeFetcher);
+ /**
+ * Remove topic and return the fetcher that has maintained this topic.
+ * @param topicKey Topic key to be removed.
+ * @return The fetcher that has maintained this topic.
+ */
+ public TopicFetcher removeTopic(String topicKey) {
+ return null;
+ }
+
/**
* Get the specified fetcher by the given fetch key.
* @param fetchKey Unique fetch key.
@@ -78,4 +87,16 @@ public abstract class TopicManager implements Cleanable {
* Close manager.
*/
public abstract void close();
+
+ /**
+ * Restart
+ */
+ public void restartAssigned() {
+
+ }
+
+ public void stopAssigned() {
+
+ }
+
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
index 7e3f1e3d44..32c74013ba 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
@@ -28,6 +28,7 @@ import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
@@ -41,7 +42,6 @@ import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -53,10 +53,12 @@ public class QueryConsumeConfigImpl implements
QueryConsumeConfig {
private final Logger logger =
LoggerFactory.getLogger(QueryConsumeConfigImpl.class);
private final CloseableHttpClient httpClient = HttpClients.createDefault();
+ private final ObjectMapper mapper = new ObjectMapper();
+
private ClientContext clientContext;
private String md5 = "";
- private Map<String, List<InLongTopic>> subscribedTopic = new HashMap<>();
+ private List<InLongTopic> subscribedTopic = new ArrayList<>();
public QueryConsumeConfigImpl(ClientContext clientContext) {
this.clientContext = clientContext;
@@ -88,7 +90,7 @@ public class QueryConsumeConfigImpl implements
QueryConsumeConfig {
String result = EntityUtils.toString(entity);
logger.debug("response String result:{}", result);
try {
- managerResponse = new ObjectMapper().readValue(result,
SortSourceConfigResponse.class);
+ managerResponse = mapper.readValue(result,
SortSourceConfigResponse.class);
return managerResponse;
} catch (Exception e) {
logger.error("parse json to ManagerResponse error:{}",
e.getMessage(), e);
@@ -159,7 +161,7 @@ public class QueryConsumeConfigImpl implements
QueryConsumeConfig {
break;
default:
logger.error("return code error:{},request:{},response:{}",
- respCodeValue, getUrl, new
ObjectMapper().writeValueAsString(response));
+ respCodeValue, getUrl,
mapper.writeValueAsString(response));
clientContext.addRequestManagerCommonError();
return true;
}
@@ -168,12 +170,10 @@ public class QueryConsumeConfigImpl implements
QueryConsumeConfig {
private void updateSortTaskConf(SortSourceConfigResponse response) {
CacheZoneConfig cacheZoneConfig = response.getData();
- Map<String, List<InLongTopic>> newGroupTopicsMap = new HashMap<>();
+ List<InLongTopic> newGroupTopics = new ArrayList<>();
+
for (Map.Entry<String, CacheZone> entry :
cacheZoneConfig.getCacheZones().entrySet()) {
CacheZone cacheZone = entry.getValue();
-
- List<InLongTopic> topics =
newGroupTopicsMap.computeIfAbsent(cacheZoneConfig.getSortTaskId(),
- k -> new ArrayList<>());
CacheZoneCluster cacheZoneCluster = new
CacheZoneCluster(cacheZone.getZoneName(),
cacheZone.getServiceUrl(), cacheZone.getAuthentication());
for (Topic topicInfo : cacheZone.getTopics()) {
@@ -182,11 +182,32 @@ public class QueryConsumeConfigImpl implements
QueryConsumeConfig {
topic.setTopic(topicInfo.getTopic());
topic.setTopicType(cacheZone.getZoneType());
topic.setProperties(topicInfo.getTopicProperties());
- topics.add(topic);
+ newGroupTopics.add(topic);
}
}
- this.subscribedTopic = newGroupTopicsMap;
+ if (CollectionUtils.isNotEmpty(newGroupTopics)) {
+ clientContext.addRequestManagerEmptyError();
+ logger.info("failed to update sort sdk config, the updated conf is
empty");
+ return;
+ }
+
+ if (CollectionUtils.isNotEmpty(subscribedTopic) &&
!checkTopics(newGroupTopics)) {
+ clientContext.addRequestManagerTopicsChangeOutOfThreshold();
+ logger.info("failed to update sort sdk config, the updated size
is={}, the old size={}",
+ newGroupTopics.size(), subscribedTopic.size());
+ return;
+ }
+
+ this.subscribedTopic = newGroupTopics;
+ }
+
+ private boolean checkTopics(List<InLongTopic> newGroupTopics) {
+ if (subscribedTopic.size() <
clientContext.getConfig().getStartOfflineTopicCheckThreshold()) {
+ return true;
+ }
+ int diff = (newGroupTopics.size() - subscribedTopic.size()) * 100 /
subscribedTopic.size();
+ return diff < clientContext.getConfig().getMaxOfflineTopicPercent();
}
/**
@@ -198,7 +219,7 @@ public class QueryConsumeConfigImpl implements
QueryConsumeConfig {
@Override
public ConsumeConfig queryCurrentConsumeConfig(String sortTaskId) {
reload();
- return new ConsumeConfig(subscribedTopic.get(sortTaskId));
+ return new ConsumeConfig(subscribedTopic);
}
@Override
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
new file mode 100644
index 0000000000..da90b64643
--- /dev/null
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl;
+
+import org.apache.inlong.sdk.sort.api.Cleanable;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InlongTopicManagerFactory;
+import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
+import org.apache.inlong.sdk.sort.api.SortClient;
+import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.api.TopicFetcher;
+import org.apache.inlong.sdk.sort.api.TopicManager;
+import org.apache.inlong.sdk.sort.exception.NotExistException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * New version of sort client.
+ */
+public class SortClientImplV2 extends SortClient {
+
+ private final String logPrefix = "[" +
SortClientImpl.class.getSimpleName() + "] ";
+ private final Logger logger =
LoggerFactory.getLogger(SortClientImpl.class);
+
+ private final SortClientConfig sortClientConfig;
+
+ private final ClientContext context;
+
+ private final TopicManager inLongTopicManager;
+
+ /**
+ * SortClient Constructor
+ *
+ * @param sortClientConfig SortClientConfig
+ */
+ public SortClientImplV2(SortClientConfig sortClientConfig) {
+ try {
+ this.sortClientConfig = sortClientConfig;
+ this.context = new ClientContextImpl(this.sortClientConfig);
+
+ this.inLongTopicManager = InlongTopicManagerFactory
+ .createInlongTopicManagerV2(context, new
QueryConsumeConfigImpl(context));
+ } catch (Exception e) {
+ this.close();
+ throw e;
+ }
+ }
+
+ /**
+ * SortClient Constructor with user defined
QueryConsumeConfig,MetricReporter and ManagerReportHandler
+ *
+ * @param sortClientConfig SortClientConfig
+ * @param queryConsumeConfig QueryConsumeConfig
+ */
+ public SortClientImplV2(SortClientConfig sortClientConfig,
QueryConsumeConfig queryConsumeConfig) {
+ try {
+ this.sortClientConfig = sortClientConfig;
+ this.context = new ClientContextImpl(this.sortClientConfig);
+ queryConsumeConfig.configure(context);
+ this.inLongTopicManager = InlongTopicManagerFactory
+ .createInlongTopicManagerV2(context, queryConsumeConfig);
+ } catch (Exception e) {
+ e.printStackTrace();
+ this.close();
+ throw e;
+ }
+ }
+
+ /**
+ * init SortClient
+ *
+ * @return true/false
+ * @throws Throwable
+ */
+ @Override
+ public boolean init() throws Throwable {
+ logger.info(logPrefix + "init|" + sortClientConfig);
+ return true;
+ }
+
+ /**
+ * ack offset to msgKey
+ *
+ * @param msgKey String
+ * @param msgOffset String
+ * @throws Exception
+ */
+ @Override
+ public void ack(String msgKey, String msgOffset)
+ throws Exception {
+ logger.debug("ack:{} offset:{}", msgKey, msgOffset);
+ TopicFetcher topicFetcher = getFetcher(msgKey);
+ topicFetcher.ack(msgOffset);
+ }
+
+ /**
+ * close SortClient
+ *
+ * @return true/false
+ */
+ @Override
+ public boolean close() {
+ boolean cleanInLongTopicManager = doClose(inLongTopicManager);
+ boolean cleanContext = doClose(context);
+
+ logger.info(logPrefix
+
+ + "|cleanInLongTopicManager=" + cleanInLongTopicManager
+ + "|cleanContext=" + cleanContext);
+ return (cleanInLongTopicManager && cleanContext);
+ }
+
+ @Override
+ public SortClientConfig getConfig() {
+ return this.sortClientConfig;
+ }
+
+ private TopicFetcher getFetcher(String msgKey) throws NotExistException {
+ TopicFetcher topicFetcher = inLongTopicManager.getFetcher(msgKey);
+ if (topicFetcher == null) {
+ throw new NotExistException(msgKey + " not exist.");
+ }
+ return topicFetcher;
+ }
+
+ private boolean doClose(Cleanable c) {
+ try {
+ if (c != null) {
+ return c.clean();
+ }
+ return true;
+ } catch (Throwable th) {
+ logger.error(logPrefix + "clean error.", th);
+ return false;
+ }
+ }
+}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java
index de30a3f308..744b38d223 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java
@@ -56,7 +56,10 @@ import java.util.stream.Collectors;
* Inlong manager that maintain the {@link
org.apache.inlong.sdk.sort.api.MultiTopicsFetcher}.
* It is suitable to the cases that topics share the same configurations.
* And each consumer will consume multi topic.
+ *
+ * InlongMultiTopicManager was deprecated since 1.9.0
*/
+@Deprecated
public class InlongMultiTopicManager extends TopicManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(InlongMultiTopicManager.class);
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
index a59e81d80d..503304c77f 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
@@ -57,7 +57,9 @@ import java.util.stream.Collectors;
* Inlong manager that maintain the single topic fetchers.
* It is suitable to the cases that each topic has its own configurations.
* And each consumer only consume the very one topic.
+ * InlongMultiTopicManager was deprecated since 1.9.0
*/
+@Deprecated
public class InlongSingleTopicManager extends TopicManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(InlongSingleTopicManager.class);
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
new file mode 100644
index 0000000000..f594126152
--- /dev/null
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.manager;
+
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InlongTopicTypeEnum;
+import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
+import org.apache.inlong.sdk.sort.api.TopicFetcher;
+import org.apache.inlong.sdk.sort.api.TopicFetcherBuilder;
+import org.apache.inlong.sdk.sort.api.TopicManager;
+import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
+import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.fetcher.tube.TubeConsumerCreator;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Inlong manager that maintain the single topic fetchers.
+ * It is suitable to the cases that each topic has its own configurations.
+ * And each consumer only consume the very one topic.
+ * InlongMultiTopicManager was used since 1.9.0
+ */
+public class InlongTopicManager extends TopicManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(InlongTopicManager.class);
+
+ private final ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
+ private final Map<String, TopicFetcher> fetchers = new
ConcurrentHashMap<>();
+ private final Map<String, PulsarClient> pulsarClients = new
ConcurrentHashMap<>();
+ private final Map<String, TubeConsumerCreator> tubeFactories = new
ConcurrentHashMap<>();
+
+ protected final ForkJoinPool pool;
+
+ private volatile boolean stopAssign = false;
+
+ private Collection<InLongTopic> assignedTopics;
+
+ public InlongTopicManager(ClientContext context, QueryConsumeConfig
queryConsumeConfig) {
+ super(context, queryConsumeConfig);
+ executor.scheduleWithFixedDelay(this::updateMetaData, 0L,
+ context.getConfig().getUpdateMetaDataIntervalSec(),
TimeUnit.SECONDS);
+ pool = new ForkJoinPool(context.getConfig().getThreadPoolSize());
+ }
+
+ @Override
+ public boolean clean() {
+ String sortTaskId = context.getConfig().getSortTaskId();
+ try {
+ LOGGER.info("start to clean topic manager, sortTaskId={}",
sortTaskId);
+ stopAssign = true;
+ closeAllFetchers();
+ closeAllPulsarClients();
+ closeAllTubeFactories();
+ LOGGER.info("success to clean topic manager, sortTaskId={}",
sortTaskId);
+ return true;
+ } catch (Exception e) {
+ LOGGER.error("failed to clean topic manager, sortTaskId={}",
sortTaskId, e);
+ } finally {
+ fetchers.clear();
+ pulsarClients.clear();
+ tubeFactories.clear();
+ stopAssign = false;
+ }
+ return false;
+ }
+
+ @Override
+ public TopicFetcher removeTopic(String topicKey) {
+ LOGGER.info("start to close fetcher key={} ", topicKey);
+ TopicFetcher topicFetcher = fetchers.remove(topicKey);
+ if (topicFetcher != null) {
+ try {
+ topicFetcher.close();
+ context.addTopicOfflineCount(1);
+ } catch (Exception e) {
+ LOGGER.error("close fetcher failed, key={}", topicKey, e);
+ }
+ }
+ return topicFetcher;
+ }
+
+ private void closeAllFetchers() {
+ pool.submit(() -> fetchers.keySet()
+ .stream()
+ .parallel()
+ .forEach(this::removeTopic));
+ }
+
+ private void closeAllPulsarClients() {
+ pool.submit(() -> pulsarClients.keySet()
+ .stream()
+ .parallel()
+ .forEach(this::closePulsarClient));
+ }
+
+ private void closeAllTubeFactories() {
+ pool.submit(() -> tubeFactories.keySet()
+ .stream()
+ .parallel()
+ .forEach(this::closeTubeFactory));
+ }
+
+ private TubeConsumerCreator closeTubeFactory(String clusterId) {
+ LOGGER.info("start to close tube creator id = {}", clusterId);
+ TubeConsumerCreator creator = tubeFactories.remove(clusterId);
+ try {
+ if (creator != null) {
+ creator.getMessageSessionFactory().shutdown();
+ }
+ } catch (Exception e) {
+ LOGGER.error("close tube factory failed, client id = {}",
clusterId);
+ }
+ return creator;
+ }
+
+ private PulsarClient closePulsarClient(String clusterId) {
+ LOGGER.info("start to close pulsar client id = {}", clusterId);
+ PulsarClient client = pulsarClients.remove(clusterId);
+ try {
+ if (client != null) {
+ client.close();
+ }
+ } catch (Exception e) {
+ LOGGER.error("close pulsar client failed, client id = {}",
clusterId);
+ }
+ return client;
+ }
+
+ @Override
+ public TopicFetcher addTopic(InLongTopic topic) {
+ checkAndOnlineCluster(topic);
+ return onlineNewTopic(topic);
+ }
+
+ @Override
+ public TopicFetcher removeTopic(InLongTopic topic, boolean closeFetcher) {
+ LOGGER.info("start to remove topicKey={}", topic.getTopicKey());
+ TopicFetcher result = fetchers.remove(topic.getTopicKey());
+ if (result != null && closeFetcher) {
+ result.close();
+ }
+ return result;
+ }
+
+ @Override
+ public TopicFetcher getFetcher(String fetchKey) {
+ return fetchers.get(fetchKey);
+ }
+
+ @Override
+ public Collection<TopicFetcher> getAllFetchers() {
+ return new ArrayList<>(fetchers.values());
+ }
+
+ @Override
+ public Set<String> getManagedInLongTopics() {
+ return new HashSet<>(fetchers.keySet());
+ }
+
+ @Override
+ public void offlineAllTopicsAndPartitions() {
+ stopAssign = true;
+ closeAllFetchers();
+ }
+
+ @Override
+ public void close() {
+ if (!executor.isShutdown()) {
+ executor.shutdown();
+ }
+ clean();
+ }
+
+ @Override
+ public void restartAssigned() {
+ stopAssign = false;
+ }
+
+ @Override
+ public void stopAssigned() {
+ stopAssign = true;
+ }
+
+ private void updateMetaData() {
+ LOGGER.debug("InLongTopicManager doWork");
+ if (stopAssign) {
+ LOGGER.warn("assign is stopped");
+ return;
+ }
+ // get sortTask conf from manager
+ if (queryConsumeConfig != null) {
+ long start = System.currentTimeMillis();
+ context.addRequestManager();
+ ConsumeConfig consumeConfig = queryConsumeConfig
+
.queryCurrentConsumeConfig(context.getConfig().getSortTaskId());
+ if (consumeConfig != null) {
+ this.assignedTopics = new
HashSet<>(context.getConfig().getConsumerSubset(consumeConfig.getTopics()));
+ handleUpdatedConsumeConfig();
+ } else {
+ LOGGER.warn("subscribedInfo is null");
+ context.addRequestManagerFail(System.currentTimeMillis() -
start);
+ }
+ } else {
+ LOGGER.error("subscribedMetaDataInfo is null");
+ }
+ }
+
+ private void handleUpdatedConsumeConfig() {
+ LOGGER.info("start to handle updated consume config");
+ if (CollectionUtils.isEmpty(assignedTopics)) {
+ LOGGER.warn("assignedTopics is null or empty, do nothing");
+ return;
+ }
+ this.onlinePulsarClients();
+ this.onlineTubeFactories();
+ this.offlineRemovedTopics();
+ this.onlineNewTopics();
+ this.updateCurrentTopics();
+ this.offlinePulsarClients();
+ this.offlineTubeFactories();
+ LOGGER.info("end to handle updated consume config");
+ }
+
+ private void offlineTubeFactories() {
+ List<CacheZoneCluster> assignedTubeClusters =
this.getCacheZoneClusters(InlongTopicTypeEnum.TUBE);
+ Set<String> intersection = assignedTubeClusters.stream()
+ .map(CacheZoneCluster::getClusterId)
+ .filter(tubeFactories::containsKey)
+ .collect(Collectors.toSet());
+ pool.submit(() -> {
+ Set<String> currentCluster = new HashSet<>(tubeFactories.keySet());
+ currentCluster.stream().parallel()
+ .filter(id -> !intersection.contains(id))
+ .forEach(this::offlineTubeFactory);
+ });
+ }
+
+ private void offlineTubeFactory(String clientId) {
+ TubeConsumerCreator client = tubeFactories.remove(clientId);
+ if (client != null) {
+ LOGGER.info("start to close tube clientId={}", clientId);
+ try {
+ client.getMessageSessionFactory().shutdown();
+ LOGGER.info("success to close tube clientId={}", clientId);
+ } catch (Exception e) {
+ LOGGER.warn("failed to close tube clientId={}", clientId);
+ }
+ } else {
+ LOGGER.warn("when close tube client, find no client id={}",
clientId);
+ }
+ }
+
+ private void offlinePulsarClients() {
+ List<CacheZoneCluster> assignedPulsarClusters =
this.getCacheZoneClusters(InlongTopicTypeEnum.PULSAR);
+ Set<String> intersection = assignedPulsarClusters.stream()
+ .map(CacheZoneCluster::getClusterId)
+ .filter(pulsarClients::containsKey)
+ .collect(Collectors.toSet());
+ pool.submit(() -> {
+ Set<String> currentCluster = new HashSet<>(pulsarClients.keySet());
+ currentCluster.stream().parallel()
+ .filter(id -> !intersection.contains(id))
+ .forEach(this::offlinePulsarClient);
+ });
+ }
+
+ private void offlinePulsarClient(String clientId) {
+ PulsarClient client = pulsarClients.remove(clientId);
+ if (client != null) {
+ LOGGER.info("start to close pulsar clientId={}", clientId);
+ try {
+ client.close();
+ LOGGER.info("success to close pulsar clientId={}", clientId);
+ } catch (Exception e) {
+ LOGGER.warn("failed to close pulsar clientId={}", clientId);
+ }
+ } else {
+ LOGGER.warn("when close pulsar client, find no client id={}",
clientId);
+ }
+ }
+
+ private void onlineTubeFactories() {
+ List<CacheZoneCluster> assignedTubeClusters =
this.getCacheZoneClusters(InlongTopicTypeEnum.TUBE);
+ List<CacheZoneCluster> newClusters = assignedTubeClusters.stream()
+ .filter(cluster ->
!tubeFactories.containsKey(cluster.getClusterId()))
+ .collect(Collectors.toList());
+ pool.submit(() ->
newClusters.stream().parallel().forEach(this::createTubeConsumerCreator));
+ }
+
+ private void createTubeConsumerCreator(CacheZoneCluster cluster) {
+ LOGGER.info("start to init tube creator for cluster={}", cluster);
+ if (cluster.getBootstraps() != null) {
+ try {
+ TubeClientConfig tubeConfig = new
TubeClientConfig(cluster.getBootstraps());
+ MessageSessionFactory messageSessionFactory = new
TubeSingleSessionFactory(tubeConfig);
+ TubeConsumerCreator tubeConsumerCreator = new
TubeConsumerCreator(messageSessionFactory,
+ tubeConfig);
+ TubeConsumerCreator oldCreator =
tubeFactories.putIfAbsent(cluster.getClusterId(), tubeConsumerCreator);
+ if (oldCreator != null) {
+ LOGGER.warn("close new tube creator for cluster={}",
cluster);
+ tubeConsumerCreator.getMessageSessionFactory().shutdown();
+ }
+ } catch (Exception e) {
+ LOGGER.error("create tube creator error for cluster={}",
cluster, e);
+ return;
+ }
+ LOGGER.info("success to init tube creatorfor cluster={}", cluster);
+ } else {
+ LOGGER.error("bootstrap is null for cluster={}", cluster);
+ }
+ }
+
+ private void onlinePulsarClients() {
+ List<CacheZoneCluster> assignedPulsarClusters =
this.getCacheZoneClusters(InlongTopicTypeEnum.PULSAR);
+ List<CacheZoneCluster> newClusters = assignedPulsarClusters.stream()
+ .filter(cluster ->
!pulsarClients.containsKey(cluster.getClusterId()))
+ .collect(Collectors.toList());
+ pool.submit(() ->
newClusters.stream().parallel().forEach(this::createPulsarClient));
+ }
+
+ private void createPulsarClient(CacheZoneCluster cluster) {
+ LOGGER.info("start to init pulsar client for cluster={}", cluster);
+ if (cluster.getBootstraps() != null) {
+ try {
+ PulsarClient pulsarClient = PulsarClient.builder()
+ .serviceUrl(cluster.getBootstraps())
+
.authentication(AuthenticationFactory.token(cluster.getToken()))
+ .build();
+ PulsarClient oldClient =
pulsarClients.putIfAbsent(cluster.getClusterId(), pulsarClient);
+ if (oldClient != null && !oldClient.isClosed()) {
+ LOGGER.warn("close new pulsar client for cluster={}",
cluster);
+ pulsarClient.close();
+ }
+ } catch (Exception e) {
+ LOGGER.error("create pulsar client error for cluster={}",
cluster, e);
+ return;
+ }
+ LOGGER.info("success to init pulsar client for cluster={}",
cluster);
+ } else {
+ LOGGER.error("bootstrap is null for cluster={}", cluster);
+ }
+ }
+
+ private List<CacheZoneCluster> getCacheZoneClusters(InlongTopicTypeEnum
type) {
+ return assignedTopics.stream()
+ .filter(topic ->
type.getName().equalsIgnoreCase(topic.getTopicType()))
+ .map(InLongTopic::getInLongCluster)
+ .distinct()
+ .collect(Collectors.toList());
+ }
+
+ private void checkAndOnlineCluster(InLongTopic topic) {
+ switch (topic.getTopicType().toLowerCase()) {
+ case "pulsar":
+ if
(!pulsarClients.containsKey(topic.getInLongCluster().getClusterId())) {
+ createPulsarClient(topic.getInLongCluster());
+ }
+ return;
+ case "tube":
+ if
(!tubeFactories.containsKey(topic.getInLongCluster().getClusterId())) {
+ createTubeConsumerCreator(topic.getInLongCluster());
+ }
+ return;
+ default:
+ LOGGER.error("do not support type={}", topic.getTopicType());
+ }
+ }
+
+ private TopicFetcher onlineNewTopic(InLongTopic topic) {
+ try {
+ if
(InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(topic.getTopicType())) {
+ LOGGER.info("the topic is pulsar {}", topic);
+ return TopicFetcherBuilder.newPulsarBuilder()
+
.pulsarClient(pulsarClients.get(topic.getInLongCluster().getClusterId()))
+ .topic(topic)
+ .context(context)
+ .subscribe();
+ } else if
(InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(topic.getTopicType())) {
+ LOGGER.info("the topic is kafka {}", topic);
+ return TopicFetcherBuilder.newKafkaBuilder()
+
.bootstrapServers(topic.getInLongCluster().getBootstraps())
+ .topic(topic)
+ .context(context)
+ .subscribe();
+ } else if
(InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(topic.getTopicType())) {
+ LOGGER.info("the topic is tube {}", topic);
+ return TopicFetcherBuilder.newTubeBuilder()
+
.tubeConsumerCreater(tubeFactories.get(topic.getInLongCluster().getClusterId()))
+ .topic(topic)
+ .context(context)
+ .subscribe();
+ } else {
+ LOGGER.error("topic type not support " + topic.getTopicType());
+ return null;
+ }
+ } catch (Exception e) {
+ LOGGER.error("failed to subscribe new topic={}", topic, e);
+ return null;
+ }
+
+ }
+
+ private void onlineNewTopics() {
+ pool.submit(() ->
getOnlineTopics().stream().parallel().forEach(this::addTopic));
+ }
+
+ private void updateCurrentTopics() {
+ pool.submit(() ->
getUpdateTopics().stream().parallel().forEach(this::updateTopic));
+ }
+
+ private void updateTopic(InLongTopic topic) {
+ TopicFetcher fetcher = fetchers.get(topic.getTopicKey());
+ if (fetcher == null) {
+ LOGGER.warn("when update topic, find no topic={}", topic);
+ return;
+ }
+ fetcher.updateTopics(Collections.singletonList(topic));
+ }
+
+ private List<InLongTopic> getOnlineTopics() {
+ return assignedTopics.stream()
+ .filter(topic -> !fetchers.containsKey(topic.getTopicKey()))
+ .distinct()
+ .collect(Collectors.toList());
+ }
+
+ private void offlineRemovedTopics() {
+ pool.submit(() -> getOfflineTopics().stream().parallel()
+ .map(InLongTopic::getTopicKey)
+ .forEach(this::removeTopic));
+ }
+
+ private List<InLongTopic> getOfflineTopics() {
+ Set<String> intersection = assignedTopics.stream()
+ .map(InLongTopic::getTopicKey)
+ .filter(fetchers::containsKey)
+ .collect(Collectors.toSet());
+
+ return assignedTopics.stream()
+ .filter(topic -> !intersection.contains(topic.getTopicKey()))
+ .distinct()
+ .collect(Collectors.toList());
+
+ }
+
+ private List<InLongTopic> getUpdateTopics() {
+ return assignedTopics.stream()
+ .filter(topic -> fetchers.containsKey(topic.getTopicKey()))
+ .distinct()
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/metrics/SortSdkMetricItem.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/metrics/SortSdkMetricItem.java
index 1282fea5c7..d85db642d8 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/metrics/SortSdkMetricItem.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/metrics/SortSdkMetricItem.java
@@ -58,9 +58,12 @@ public class SortSdkMetricItem extends MetricItem {
public static final String M_REQUEST_MANAGER_COUNT = "requestManagerCount";
public static final String M_REQUEST_MANAGER_TIME_COST =
"requestManagerTimeCost";
public static final String M_REQUEST_MANAGER_FAIL_COUNT =
"requestManagerFailCount";
- public static final String M_REQUEST_MANAGER_CONF_CHANAGED_COUNT =
"requestManagerConfChanagedCount";
- public static final String M_RQUEST_MANAGER_COMMON_ERROR_COUNT =
"requestManagerCommonErrorCount";
- public static final String M_RQUEST_MANAGER_PARAM_ERROR_COUNT =
"requestManagerParamErrorCount";
+ public static final String M_REQEUST_MANAGER_EMPTY_COUNT =
"requestManagerEmptyCount";
+ public static final String
M_REQUEST_MANAGER_TOPICS_CHANGE_OUT_OF_THRESHOLD =
+ "requestManagerTopicsChangeOutOfThreshold";
+ public static final String M_REQUEST_MANAGER_CONF_CHANGED_COUNT =
"requestManagerConfChangedCount";
+ public static final String M_REQUEST_MANAGER_COMMON_ERROR_COUNT =
"requestManagerCommonErrorCount";
+ public static final String M_REQUEST_MANAGER_PARAM_ERROR_COUNT =
"requestManagerParamErrorCount";
@Dimension
public String sortTaskId;
@@ -113,6 +116,10 @@ public class SortSdkMetricItem extends MetricItem {
public AtomicLong requestManagerCommonErrorCount = new AtomicLong(0);
@CountMetric
public AtomicLong requestManagerParamErrorCount = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong reqeustManagerEmptyCount = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong requestManagerTopicsChangeOutOfThreshold = new
AtomicLong(0);
public SortSdkMetricItem() {
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/metrics/SortSdkPrometheusMetricListener.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/metrics/SortSdkPrometheusMetricListener.java
index 95898de333..9ec89ea066 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/metrics/SortSdkPrometheusMetricListener.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/metrics/SortSdkPrometheusMetricListener.java
@@ -87,11 +87,12 @@ public class SortSdkPrometheusMetricListener extends
Collector implements Metric
metricValueMap.put(SortSdkMetricItem.M_REQUEST_MANAGER_COUNT,
metricItem.requestManagerCount);
metricValueMap.put(SortSdkMetricItem.M_REQUEST_MANAGER_TIME_COST,
metricItem.requestManagerTimeCost);
metricValueMap.put(SortSdkMetricItem.M_REQUEST_MANAGER_FAIL_COUNT,
metricItem.requestManagerFailCount);
-
metricValueMap.put(SortSdkMetricItem.M_REQUEST_MANAGER_CONF_CHANAGED_COUNT,
+ metricValueMap.put(SortSdkMetricItem.M_REQEUST_MANAGER_EMPTY_COUNT,
metricItem.reqeustManagerEmptyCount);
+
metricValueMap.put(SortSdkMetricItem.M_REQUEST_MANAGER_CONF_CHANGED_COUNT,
metricItem.requestManagerConfChangedCount);
-
metricValueMap.put(SortSdkMetricItem.M_RQUEST_MANAGER_COMMON_ERROR_COUNT,
+
metricValueMap.put(SortSdkMetricItem.M_REQUEST_MANAGER_COMMON_ERROR_COUNT,
metricItem.requestManagerCommonErrorCount);
-
metricValueMap.put(SortSdkMetricItem.M_RQUEST_MANAGER_PARAM_ERROR_COUNT,
+
metricValueMap.put(SortSdkMetricItem.M_REQUEST_MANAGER_PARAM_ERROR_COUNT,
metricItem.requestManagerParamErrorCount);
this.dimensionKeys.add(DEFAULT_DIMENSION_LABEL);
@@ -123,12 +124,14 @@ public class SortSdkPrometheusMetricListener extends
Collector implements Metric
metricItem.requestManagerTimeCost.get());
totalCounter.addMetric(Collections.singletonList(M_REQUEST_MANAGER_FAIL_COUNT),
metricItem.requestManagerFailCount.get());
-
totalCounter.addMetric(Collections.singletonList(M_REQUEST_MANAGER_CONF_CHANAGED_COUNT),
+
totalCounter.addMetric(Collections.singletonList(M_REQUEST_MANAGER_CONF_CHANGED_COUNT),
metricItem.requestManagerConfChangedCount.get());
-
totalCounter.addMetric(Collections.singletonList(M_RQUEST_MANAGER_PARAM_ERROR_COUNT),
+
totalCounter.addMetric(Collections.singletonList(M_REQUEST_MANAGER_PARAM_ERROR_COUNT),
metricItem.requestManagerParamErrorCount.get());
-
totalCounter.addMetric(Collections.singletonList(M_RQUEST_MANAGER_COMMON_ERROR_COUNT),
+
totalCounter.addMetric(Collections.singletonList(M_REQUEST_MANAGER_COMMON_ERROR_COUNT),
metricItem.requestManagerCommonErrorCount.get());
+
totalCounter.addMetric(Collections.singletonList(M_REQEUST_MANAGER_EMPTY_COUNT),
+ metricItem.reqeustManagerEmptyCount.get());
List<MetricFamilySamples> mfs = new ArrayList<>();
mfs.add(totalCounter);
return mfs;