This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new eb7529a Support large service list query and replace deprecated API
of Armeria (#8628)
eb7529a is described below
commit eb7529adb6017d6509cb2c081c53e6cbc29f62c9
Author: kezhenxu94 <[email protected]>
AuthorDate: Tue Mar 8 13:18:18 2022 +0800
Support large service list query and replace deprecated API of Armeria
(#8628)
---
CHANGES.md | 3 +
docs/en/setup/backend/configuration-vocabulary.md | 3 +-
.../client/elasticsearch/ElasticSearchClient.java | 23 +++++---
.../library/elasticsearch/ElasticSearch.java | 17 ++++--
.../library/elasticsearch/client/SearchClient.java | 34 ++++++++++-
.../requests/factory/SearchFactory.java | 10 +++-
.../factory/common/CommonSearchFactory.java | 23 ++++++--
.../elasticsearch/requests/search/Scroll.java | 37 ++++++++++++
.../requests/search/ScrollBuilder.java | 51 +++++++++++++++++
.../requests/search/SearchParams.java | 66 ++++++++++++++++++++++
.../response/search/SearchResponse.java | 3 +
.../src/main/resources/application.yml | 5 +-
.../StorageModuleElasticsearchConfig.java | 6 ++
.../StorageModuleElasticsearchProvider.java | 6 +-
.../storage/plugin/elasticsearch/base/EsDAO.java | 2 +
.../cache/NetworkAddressAliasEsDAO.java | 33 ++++++++---
.../elasticsearch/query/MetadataQueryEsDAO.java | 55 ++++++++++++++----
17 files changed, 332 insertions(+), 45 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 0e1edb5..dc49f34 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -82,6 +82,9 @@ Release Notes.
* Introduce the entity of Process type.
* Set the length of event#parameters to 2000.
* Limit the length of Event#parameters.
+* Support large service/instance/networkAddressAlias list query by using
ElasticSearch scrolling API, add `metadataQueryBatchSize` to configure
scrolling page size.
+* Change default value of `metadataQueryMaxSize` from `5000` to `10000`
+* Replace deprecated Armeria API `BasicToken.of` with `AuthToken.ofBasic`.
#### UI
diff --git a/docs/en/setup/backend/configuration-vocabulary.md
b/docs/en/setup/backend/configuration-vocabulary.md
index 6b39677..87bad63 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -106,7 +106,8 @@ core|default|role|Option values:
`Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | flushInterval| Period of flush (in seconds). Does not matter whether
`bulkActions` is reached or not. INT(flushInterval * 2/3) is used for index
refresh period. | SW_STORAGE_ES_FLUSH_INTERVAL | 15 (index refresh period = 10)|
| - | - | concurrentRequests| The number of concurrent requests allowed to be
executed. | SW_STORAGE_ES_CONCURRENT_REQUESTS| 2 |
| - | - | resultWindowMaxSize | The maximum size of dataset when the OAP loads
cache, such as network aliases. | SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE | 10000|
-| - | - | metadataQueryMaxSize | The maximum size of metadata per query. |
SW_STORAGE_ES_QUERY_MAX_SIZE | 5000 |
+| - | - | metadataQueryMaxSize | The maximum size of metadata per query. |
SW_STORAGE_ES_QUERY_MAX_SIZE | 10000 |
+| - | - | scrollingBatchSize | The batch size of metadata per iteration when
`metadataQueryMaxSize` or `resultWindowMaxSize` is too large to be retrieved in
a single query. | SW_STORAGE_ES_SCROLLING_BATCH_SIZE | 5000 |
| - | - | segmentQueryMaxSize | The maximum size of trace segments per query.
| SW_STORAGE_ES_QUERY_SEGMENT_SIZE | 200|
| - | - | profileTaskQueryMaxSize | The maximum size of profile task per
query. | SW_STORAGE_ES_QUERY_PROFILE_TASK_SIZE | 200|
| - | - | advanced | All settings of ElasticSearch index creation. The value
should be in JSON format. | SW_STORAGE_ES_ADVANCED | - |
diff --git
a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 05b29da..cdbd875 100644
---
a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++
b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -40,6 +40,7 @@ import
org.apache.skywalking.library.elasticsearch.ElasticSearchVersion;
import org.apache.skywalking.library.elasticsearch.bulk.BulkProcessor;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
+import
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
import org.apache.skywalking.library.elasticsearch.response.Document;
import org.apache.skywalking.library.elasticsearch.response.Index;
import org.apache.skywalking.library.elasticsearch.response.IndexTemplate;
@@ -254,13 +255,11 @@ public class ElasticSearchClient implements Client,
HealthCheckable {
.toArray(String[]::new);
return es.get().search(
search,
- ImmutableMap.of(
- "ignore_unavailable", true,
- "allow_no_indices", true,
- "expand_wildcards", "open"
- ),
- indexNames
- );
+ new SearchParams()
+ .ignoreUnavailable(true)
+ .allowNoIndices(true)
+ .expandWildcards("open"),
+ indexNames);
}
public SearchResponse search(String indexName, Search search) {
@@ -269,6 +268,16 @@ public class ElasticSearchClient implements Client,
HealthCheckable {
return es.get().search(search, indexName);
}
+ public SearchResponse search(String indexName, Search search, SearchParams
params) {
+ indexName = indexNameConverter.apply(indexName);
+
+ return es.get().search(search, params, indexName);
+ }
+
+ public SearchResponse scroll(Duration contextRetention, String scrollId) {
+ return es.get().scroll(contextRetention, scrollId);
+ }
+
public Optional<Document> get(String indexName, String id) {
indexName = indexNameConverter.apply(indexName);
diff --git
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearch.java
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearch.java
index e39e53a..acb7fda 100644
---
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearch.java
+++
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearch.java
@@ -31,7 +31,7 @@ import com.linecorp.armeria.client.retry.RetryingClient;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.SessionProtocol;
-import com.linecorp.armeria.common.auth.BasicToken;
+import com.linecorp.armeria.common.auth.AuthToken;
import com.linecorp.armeria.common.util.Exceptions;
import java.io.Closeable;
import java.io.IOException;
@@ -39,7 +39,6 @@ import java.io.InputStream;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import lombok.Getter;
@@ -51,7 +50,9 @@ import
org.apache.skywalking.library.elasticsearch.client.DocumentClient;
import org.apache.skywalking.library.elasticsearch.client.IndexClient;
import org.apache.skywalking.library.elasticsearch.client.SearchClient;
import org.apache.skywalking.library.elasticsearch.client.TemplateClient;
+import org.apache.skywalking.library.elasticsearch.requests.search.Scroll;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
+import
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
import org.apache.skywalking.library.elasticsearch.response.NodeInfo;
import
org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import org.apache.skywalking.oap.server.library.util.StringUtil;
@@ -104,7 +105,7 @@ public final class ElasticSearch implements Closeable {
.maxTotalAttempts(3)
.newDecorator());
if (StringUtil.isNotBlank(username) &&
StringUtil.isNotBlank(password)) {
- builder.auth(BasicToken.of(username, password));
+ builder.auth(AuthToken.ofBasic(username, password));
}
client = builder.build();
version = new CompletableFuture<>();
@@ -170,7 +171,7 @@ public final class ElasticSearch implements Closeable {
return aliasClient;
}
- public SearchResponse search(Search search, Map<String, ?> params,
String... index) {
+ public SearchResponse search(Search search, SearchParams params, String...
index) {
return searchClient.search(search, params, index);
}
@@ -178,6 +179,14 @@ public final class ElasticSearch implements Closeable {
return search(search, null, index);
}
+ public SearchResponse scroll(Duration contextRetention, String scrollId) {
+ return searchClient.scroll(
+ Scroll.builder()
+ .contextRetention(contextRetention)
+ .scrollId(scrollId)
+ .build());
+ }
+
@Override
public void close() {
endpointGroup.removeListener(healthyEndpointListener);
diff --git
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/SearchClient.java
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/SearchClient.java
index d585a2f..b442605 100644
---
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/SearchClient.java
+++
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/SearchClient.java
@@ -22,13 +22,14 @@ import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.util.Exceptions;
import java.io.InputStream;
-import java.util.Map;
import java.util.concurrent.CompletableFuture;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.library.elasticsearch.ElasticSearchVersion;
+import org.apache.skywalking.library.elasticsearch.requests.search.Scroll;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
+import
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
import
org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
@Slf4j
@@ -40,7 +41,7 @@ public final class SearchClient {
@SneakyThrows
public SearchResponse search(Search criteria,
- Map<String, ?> params,
+ SearchParams params,
String... index) {
final CompletableFuture<SearchResponse> future =
version.thenCompose(
@@ -72,4 +73,33 @@ public final class SearchClient {
});
return future.get();
}
+
+ @SneakyThrows
+ public SearchResponse scroll(Scroll scroll) {
+ final CompletableFuture<SearchResponse> future =
+ version.thenCompose(
+ v -> client.execute(v.requestFactory().search().scroll(scroll))
+ .aggregate().thenApply(response -> {
+ if (response.status() != HttpStatus.OK) {
+ throw new RuntimeException(response.contentUtf8());
+ }
+
+ try (final HttpData content = response.content();
+ final InputStream is = content.toInputStream()) {
+ return v.codec().decode(is, SearchResponse.class);
+ } catch (Exception e) {
+ return Exceptions.throwUnsafely(e);
+ }
+ }));
+ future.whenComplete((result, exception) -> {
+ if (exception != null) {
+ log.error("Failed to scroll, request {}, {}", scroll,
exception);
+ return;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Succeeded to scroll, {}", result);
+ }
+ });
+ return future.get();
+ }
}
diff --git
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/SearchFactory.java
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/SearchFactory.java
index 3824a4b..51c8f5e 100644
---
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/SearchFactory.java
+++
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/SearchFactory.java
@@ -18,14 +18,20 @@
package org.apache.skywalking.library.elasticsearch.requests.factory;
import com.linecorp.armeria.common.HttpRequest;
-import java.util.Map;
+import org.apache.skywalking.library.elasticsearch.requests.search.Scroll;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
+import
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
public interface SearchFactory {
/**
* Returns a request to search documents.
*/
- HttpRequest search(Search search, Map<String, ?> queryParams, String...
index);
+ HttpRequest search(Search search, SearchParams params, String... index);
+
+ /**
+ * Returns a request to retrieve the next batch of results for a scrolling
search.
+ */
+ HttpRequest scroll(Scroll scroll);
/**
* Returns a request to search documents.
diff --git
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/common/CommonSearchFactory.java
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/common/CommonSearchFactory.java
index 2b4ff04..4ca7689 100644
---
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/common/CommonSearchFactory.java
+++
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/common/CommonSearchFactory.java
@@ -20,13 +20,14 @@ package
org.apache.skywalking.library.elasticsearch.requests.factory.common;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpRequestBuilder;
import com.linecorp.armeria.common.MediaType;
-import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.library.elasticsearch.ElasticSearchVersion;
import
org.apache.skywalking.library.elasticsearch.requests.factory.SearchFactory;
+import org.apache.skywalking.library.elasticsearch.requests.search.Scroll;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
+import
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
@Slf4j
@RequiredArgsConstructor
@@ -36,7 +37,7 @@ public final class CommonSearchFactory implements
SearchFactory {
@SneakyThrows
@Override
public HttpRequest search(Search search,
- Map<String, ?> queryParams,
+ SearchParams params,
String... indices) {
final HttpRequestBuilder builder = HttpRequest.builder();
@@ -47,8 +48,8 @@ public final class CommonSearchFactory implements
SearchFactory {
.pathParam("indices", String.join(",", indices));
}
- if (queryParams != null && !queryParams.isEmpty()) {
- queryParams.forEach(builder::queryParam);
+ if (params != null) {
+ params.forEach(e -> builder.queryParam(e.getKey(), e.getValue()));
}
final byte[] content = version.codec().encode(search);
@@ -60,4 +61,18 @@ public final class CommonSearchFactory implements
SearchFactory {
return builder.content(MediaType.JSON, content)
.build();
}
+
+ @SneakyThrows
+ @Override
+ public HttpRequest scroll(Scroll scroll) {
+ final HttpRequestBuilder builder =
HttpRequest.builder().get("/_search/scroll");
+
+ final byte[] content = version.codec().encode(scroll);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Scroll request: {}", new String(content));
+ }
+
+ return builder.content(MediaType.JSON, content).build();
+ }
}
diff --git
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/Scroll.java
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/Scroll.java
new file mode 100644
index 0000000..6cd7411
--- /dev/null
+++
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/Scroll.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.skywalking.library.elasticsearch.requests.search;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+@Getter
+@RequiredArgsConstructor(access = AccessLevel.PACKAGE)
+public final class Scroll {
+ private final String scroll;
+ @JsonProperty("scroll_id")
+ private final String scrollId;
+
+ public static ScrollBuilder builder() {
+ return new ScrollBuilder();
+ }
+}
diff --git
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/ScrollBuilder.java
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/ScrollBuilder.java
new file mode 100644
index 0000000..1ac1e1a
--- /dev/null
+++
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/ScrollBuilder.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.skywalking.library.elasticsearch.requests.search;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import java.time.Duration;
+import com.google.common.base.Strings;
+
+public final class ScrollBuilder {
+ private Duration contextRetention;
+ private String scrollId;
+
+ public ScrollBuilder contextRetention(Duration contextRetention) {
+ checkArgument(
+ contextRetention != null && !contextRetention.isNegative()
+ && !contextRetention.isZero(),
+ "contextRetention must be positive, but was %s", contextRetention);
+ this.contextRetention = contextRetention;
+ return this;
+ }
+
+ public ScrollBuilder scrollId(String scrollId) {
+ checkArgument(!Strings.isNullOrEmpty(scrollId), "scrollId cannot be
blank");
+ this.scrollId = scrollId;
+ return this;
+ }
+
+ public Scroll build() {
+ checkState(contextRetention != null, "contextRetention is not set");
+ checkState(scrollId != null, "scrollId is not set");
+ return new Scroll(contextRetention.getSeconds() + "s", scrollId);
+ }
+}
diff --git
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/SearchParams.java
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/SearchParams.java
new file mode 100644
index 0000000..26a9715
--- /dev/null
+++
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/SearchParams.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.skywalking.library.elasticsearch.requests.search;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public final class SearchParams implements Iterable<Map.Entry<String, Object>>
{
+ private static final String IGNORE_UNAVAILABLE = "ignore_unavailable";
+ private static final String ALLOW_NO_INDICES = "allow_no_indices";
+ private static final String EXPAND_WILDCARDS = "expand_wildcards";
+ private static final String SCROLL = "scroll";
+
+ private final Map<String, Object> params = new HashMap<>();
+
+ public SearchParams ignoreUnavailable(boolean ignoreUnavailable) {
+ params.put(IGNORE_UNAVAILABLE, ignoreUnavailable);
+ return this;
+ }
+
+ public SearchParams allowNoIndices(boolean allowNoIndices) {
+ params.put(ALLOW_NO_INDICES, allowNoIndices);
+ return this;
+ }
+
+ public SearchParams expandWildcards(String wildcards) {
+ params.put(EXPAND_WILDCARDS, wildcards);
+ return this;
+ }
+
+ public SearchParams scroll(Duration contextRetention) {
+ checkArgument(
+ contextRetention != null && !contextRetention.isNegative()
+ && !contextRetention.isZero(),
+ "contextRetention must be positive, but was %s",
+ contextRetention);
+ params.put(SCROLL, contextRetention.getSeconds() + "s");
+ return this;
+ }
+
+ @Override
+ public Iterator<Entry<String, Object>> iterator() {
+ return params.entrySet().iterator();
+ }
+}
diff --git
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/search/SearchResponse.java
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/search/SearchResponse.java
index 270ac22..746c0b1 100644
---
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/search/SearchResponse.java
+++
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/search/SearchResponse.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.library.elasticsearch.response.search;
import java.util.Map;
+import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;
import lombok.Setter;
@@ -26,4 +27,6 @@ import lombok.Setter;
public final class SearchResponse {
private SearchHits hits = new SearchHits();
private Map<String, Object> aggregations;
+ @JsonProperty("_scroll_id")
+ private String scrollId;
}
diff --git a/oap-server/server-starter/src/main/resources/application.yml
b/oap-server/server-starter/src/main/resources/application.yml
index 8cd9909..08970d9 100755
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -156,7 +156,8 @@ storage:
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:15}
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of
concurrent requests
resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
- metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
+ metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:10000}
+ scrollingBatchSize: ${SW_STORAGE_ES_SCROLLING_BATCH_SIZE:5000}
segmentQueryMaxSize: ${SW_STORAGE_ES_QUERY_SEGMENT_SIZE:200}
profileTaskQueryMaxSize: ${SW_STORAGE_ES_QUERY_PROFILE_TASK_SIZE:200}
oapAnalyzer:
${SW_STORAGE_ES_OAP_ANALYZER:"{\"analyzer\":{\"oap_analyzer\":{\"type\":\"stop\"}}}"}
# the oap analyzer.
@@ -509,4 +510,4 @@ receiver-event:
receiver-ebpf:
selector: ${SW_RECEIVER_EBPF:default}
- default:
\ No newline at end of file
+ default:
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index 47a03d5..fba9f1b 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -108,6 +108,12 @@ public class StorageModuleElasticsearchConfig extends
ModuleConfig {
private String trustStorePass;
private int resultWindowMaxSize = 10000;
private int metadataQueryMaxSize = 5000;
+ /**
+ * @since 9.0.0 The batch size that is used to scroll on the large results,
+ * if {@link #metadataQueryMaxSize} is larger than the maximum result
window in
+ * ElasticSearch server, this can be used to retrieve all results.
+ */
+ private int scrollingBatchSize = 5000;
private int segmentQueryMaxSize = 200;
private int profileTaskQueryMaxSize = 200;
/**
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index b58d240..c48e3aa 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -168,15 +168,13 @@ public class StorageModuleElasticsearchProvider extends
ModuleProvider {
this.registerServiceImplementation(
IHistoryDeleteDAO.class, new
HistoryDeleteEsDAO(elasticSearchClient));
this.registerServiceImplementation(
- INetworkAddressAliasDAO.class, new
NetworkAddressAliasEsDAO(elasticSearchClient, config
- .getResultWindowMaxSize()));
+ INetworkAddressAliasDAO.class, new
NetworkAddressAliasEsDAO(elasticSearchClient, config));
this.registerServiceImplementation(ITopologyQueryDAO.class, new
TopologyQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IMetricsQueryDAO.class, new
MetricsQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(
ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient,
config.getSegmentQueryMaxSize()));
this.registerServiceImplementation(IBrowserLogQueryDAO.class, new
BrowserLogQueryEsDAO(elasticSearchClient));
- this.registerServiceImplementation(
- IMetadataQueryDAO.class, new
MetadataQueryEsDAO(elasticSearchClient, config.getMetadataQueryMaxSize()));
+ this.registerServiceImplementation(IMetadataQueryDAO.class, new
MetadataQueryEsDAO(elasticSearchClient, config));
this.registerServiceImplementation(IAggregationQueryDAO.class, new
AggregationQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IAlarmQueryDAO.class, new
AlarmQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new
TopNRecordsQueryEsDAO(elasticSearchClient));
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
index 0c4562d..ad21138 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
@@ -18,10 +18,12 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+import java.time.Duration;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
+ protected static final Duration SCROLL_CONTEXT_RETENTION =
Duration.ofSeconds(30);
public EsDAO(ElasticSearchClient client) {
super(client);
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
index 5f8d1da..799f47c 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
@@ -23,20 +23,25 @@ import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
+import
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
import
org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import
org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
import
org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
@Slf4j
public class NetworkAddressAliasEsDAO extends EsDAO implements
INetworkAddressAliasDAO {
protected final int resultWindowMaxSize;
+ protected final int scrollingBatchSize;
- public NetworkAddressAliasEsDAO(ElasticSearchClient client, int
resultWindowMaxSize) {
+ public NetworkAddressAliasEsDAO(ElasticSearchClient client,
+ StorageModuleElasticsearchConfig config) {
super(client);
- this.resultWindowMaxSize = resultWindowMaxSize;
+ this.resultWindowMaxSize = config.getResultWindowMaxSize();
+ this.scrollingBatchSize = config.getScrollingBatchSize();
}
@Override
@@ -44,19 +49,29 @@ public class NetworkAddressAliasEsDAO extends EsDAO
implements INetworkAddressAl
List<NetworkAddressAlias> networkAddressAliases = new ArrayList<>();
try {
+ final int batchSize = Math.min(resultWindowMaxSize,
scrollingBatchSize);
final Search search =
Search.builder().query(
Query.range(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET)
.gte(timeBucketInMinute))
- .size(resultWindowMaxSize)
+ .size(batchSize)
.build();
-
- final SearchResponse results =
- getClient().search(NetworkAddressAlias.INDEX_NAME, search);
-
+ final SearchParams params = new
SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
final NetworkAddressAlias.Builder builder = new
NetworkAddressAlias.Builder();
- for (SearchHit searchHit : results.getHits()) {
-
networkAddressAliases.add(builder.storage2Entity(searchHit.getSource()));
+
+ SearchResponse results =
+ getClient().search(NetworkAddressAlias.INDEX_NAME, search,
params);
+ while (results.getHits().getTotal() > 0) {
+ for (SearchHit searchHit : results.getHits()) {
+
networkAddressAliases.add(builder.storage2Entity(searchHit.getSource()));
+ }
+ if (results.getHits().getTotal() < batchSize) {
+ break;
+ }
+ if (networkAddressAliases.size() >= resultWindowMaxSize) {
+ break;
+ }
+ results = getClient().scroll(SCROLL_CONTEXT_RETENTION,
results.getScrollId());
}
} catch (Throwable t) {
log.error(t.getMessage(), t);
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
index 39ccf47..e7e449c 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
@@ -29,6 +29,7 @@ import
org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuil
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
import
org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
+import
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
import
org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import org.apache.skywalking.oap.server.core.analysis.Layer;
@@ -44,6 +45,7 @@ import
org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.StringUtil;
+import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
@@ -52,10 +54,14 @@ import static
org.apache.skywalking.oap.server.core.analysis.manual.instance.Ins
public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
private final int queryMaxSize;
+ private final int scrollingBatchSize;
- public MetadataQueryEsDAO(ElasticSearchClient client, int queryMaxSize) {
+ public MetadataQueryEsDAO(
+ ElasticSearchClient client,
+ StorageModuleElasticsearchConfig config) {
super(client);
- this.queryMaxSize = queryMaxSize;
+ this.queryMaxSize = config.getMetadataQueryMaxSize();
+ this.scrollingBatchSize = config.getScrollingBatchSize();
}
@Override
@@ -63,18 +69,34 @@ public class MetadataQueryEsDAO extends EsDAO implements
IMetadataQueryDAO {
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(ServiceTraffic.INDEX_NAME);
+ final int batchSize = Math.min(queryMaxSize, scrollingBatchSize);
final BoolQueryBuilder query =
Query.bool();
- final SearchBuilder search =
Search.builder().query(query).size(queryMaxSize);
+ final SearchBuilder search =
Search.builder().query(query).size(batchSize);
if (StringUtil.isNotEmpty(layer)) {
query.must(Query.term(ServiceTraffic.LAYER,
Layer.valueOf(layer).value()));
}
if (StringUtil.isNotEmpty(group)) {
query.must(Query.term(ServiceTraffic.GROUP, group));
}
- final SearchResponse results = getClient().search(index,
search.build());
-
- return buildServices(results);
+ final SearchParams params = new
SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
+ final List<Service> services = new ArrayList<>();
+
+ SearchResponse results = getClient().search(index, search.build(),
params);
+ while (results.getHits().getTotal() > 0) {
+ final List<Service> batch = buildServices(results);
+ services.addAll(batch);
+ // The last iterate, there is no more data
+ if (batch.size() < batchSize) {
+ break;
+ }
+ // We've got enough data
+ if (services.size() >= queryMaxSize) {
+ break;
+ }
+ results = getClient().scroll(SCROLL_CONTEXT_RETENTION,
results.getScrollId());
+ }
+ return services;
}
@Override
@@ -101,10 +123,23 @@ public class MetadataQueryEsDAO extends EsDAO implements
IMetadataQueryDAO {
Query.bool()
.must(Query.range(InstanceTraffic.LAST_PING_TIME_BUCKET).gte(minuteTimeBucket))
.must(Query.term(InstanceTraffic.SERVICE_ID, serviceId));
- final SearchBuilder search =
Search.builder().query(query).size(queryMaxSize);
-
- final SearchResponse response = getClient().search(index,
search.build());
- return buildInstances(response);
+ final int batchSize = Math.min(queryMaxSize, scrollingBatchSize);
+ final SearchBuilder search =
Search.builder().query(query).size(batchSize);
+
+ final List<ServiceInstance> instances = new ArrayList<>();
+ SearchResponse response = getClient().search(index, search.build());
+ while (response.getHits().getTotal() > 0) {
+ final List<ServiceInstance> batch = buildInstances(response);
+ instances.addAll(batch);
+ if (batch.size() < batchSize) {
+ break;
+ }
+ if (batch.size() >= queryMaxSize) {
+ break;
+ }
+ response = getClient().scroll(SCROLL_CONTEXT_RETENTION,
response.getScrollId());
+ }
+ return instances;
}
@Override