This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new eb7529a  Support large service list query and replace deprecated API 
of Armeria (#8628)
eb7529a is described below

commit eb7529adb6017d6509cb2c081c53e6cbc29f62c9
Author: kezhenxu94 <[email protected]>
AuthorDate: Tue Mar 8 13:18:18 2022 +0800

    Support large service list query and replace deprecated API of Armeria 
(#8628)
---
 CHANGES.md                                         |  3 +
 docs/en/setup/backend/configuration-vocabulary.md  |  3 +-
 .../client/elasticsearch/ElasticSearchClient.java  | 23 +++++---
 .../library/elasticsearch/ElasticSearch.java       | 17 ++++--
 .../library/elasticsearch/client/SearchClient.java | 34 ++++++++++-
 .../requests/factory/SearchFactory.java            | 10 +++-
 .../factory/common/CommonSearchFactory.java        | 23 ++++++--
 .../elasticsearch/requests/search/Scroll.java      | 37 ++++++++++++
 .../requests/search/ScrollBuilder.java             | 51 +++++++++++++++++
 .../requests/search/SearchParams.java              | 66 ++++++++++++++++++++++
 .../response/search/SearchResponse.java            |  3 +
 .../src/main/resources/application.yml             |  5 +-
 .../StorageModuleElasticsearchConfig.java          |  6 ++
 .../StorageModuleElasticsearchProvider.java        |  6 +-
 .../storage/plugin/elasticsearch/base/EsDAO.java   |  2 +
 .../cache/NetworkAddressAliasEsDAO.java            | 33 ++++++++---
 .../elasticsearch/query/MetadataQueryEsDAO.java    | 55 ++++++++++++++----
 17 files changed, 332 insertions(+), 45 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 0e1edb5..dc49f34 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -82,6 +82,9 @@ Release Notes.
 * Introduce the entity of Process type.
 * Set the length of event#parameters to 2000.
 * Limit the length of Event#parameters.
+* Support large service/instance/networkAddressAlias list query by using 
ElasticSearch scrolling API, add `metadataQueryBatchSize` to configure 
scrolling page size.
+* Change default value of `metadataQueryMaxSize` from `5000` to `10000`
+* Replace deprecated Armeria API `BasicToken.of` with `AuthToken.ofBasic`.
 
 #### UI
 
diff --git a/docs/en/setup/backend/configuration-vocabulary.md 
b/docs/en/setup/backend/configuration-vocabulary.md
index 6b39677..87bad63 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -106,7 +106,8 @@ core|default|role|Option values: 
`Mixed/Receiver/Aggregator`. **Receiver** mode
 | - | - | flushInterval| Period of flush (in seconds). Does not matter whether 
`bulkActions` is reached or not. INT(flushInterval * 2/3) is used for index 
refresh period. | SW_STORAGE_ES_FLUSH_INTERVAL | 15 (index refresh period = 10)|
 | - | - | concurrentRequests| The number of concurrent requests allowed to be 
executed. | SW_STORAGE_ES_CONCURRENT_REQUESTS| 2 |
 | - | - | resultWindowMaxSize | The maximum size of dataset when the OAP loads 
cache, such as network aliases. | SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE | 10000|
-| - | - | metadataQueryMaxSize | The maximum size of metadata per query. | 
SW_STORAGE_ES_QUERY_MAX_SIZE | 5000 |
+| - | - | metadataQueryMaxSize | The maximum size of metadata per query. | 
SW_STORAGE_ES_QUERY_MAX_SIZE | 10000 |
+| - | - | scrollingBatchSize | The batch size of metadata per iteration when 
`metadataQueryMaxSize` or `resultWindowMaxSize` is too large to be retrieved in 
a single query. | SW_STORAGE_ES_SCROLLING_BATCH_SIZE | 5000 |
 | - | - | segmentQueryMaxSize | The maximum size of trace segments per query. 
| SW_STORAGE_ES_QUERY_SEGMENT_SIZE | 200|
 | - | - | profileTaskQueryMaxSize | The maximum size of profile task per 
query. | SW_STORAGE_ES_QUERY_PROFILE_TASK_SIZE | 200|
 | - | - | advanced | All settings of ElasticSearch index creation. The value 
should be in JSON format. | SW_STORAGE_ES_ADVANCED | - |
diff --git 
a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
 
b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 05b29da..cdbd875 100644
--- 
a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ 
b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -40,6 +40,7 @@ import 
org.apache.skywalking.library.elasticsearch.ElasticSearchVersion;
 import org.apache.skywalking.library.elasticsearch.bulk.BulkProcessor;
 import org.apache.skywalking.library.elasticsearch.requests.search.Query;
 import org.apache.skywalking.library.elasticsearch.requests.search.Search;
+import 
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
 import org.apache.skywalking.library.elasticsearch.response.Document;
 import org.apache.skywalking.library.elasticsearch.response.Index;
 import org.apache.skywalking.library.elasticsearch.response.IndexTemplate;
@@ -254,13 +255,11 @@ public class ElasticSearchClient implements Client, 
HealthCheckable {
                   .toArray(String[]::new);
         return es.get().search(
             search,
-            ImmutableMap.of(
-                "ignore_unavailable", true,
-                "allow_no_indices", true,
-                "expand_wildcards", "open"
-            ),
-            indexNames
-        );
+            new SearchParams()
+                .ignoreUnavailable(true)
+                .allowNoIndices(true)
+                .expandWildcards("open"),
+            indexNames);
     }
 
     public SearchResponse search(String indexName, Search search) {
@@ -269,6 +268,16 @@ public class ElasticSearchClient implements Client, 
HealthCheckable {
         return es.get().search(search, indexName);
     }
 
+    public SearchResponse search(String indexName, Search search, SearchParams 
params) {
+        indexName = indexNameConverter.apply(indexName);
+
+        return es.get().search(search, params, indexName);
+    }
+
+    public SearchResponse scroll(Duration contextRetention, String scrollId) {
+        return es.get().scroll(contextRetention, scrollId);
+    }
+
     public Optional<Document> get(String indexName, String id) {
         indexName = indexNameConverter.apply(indexName);
 
diff --git 
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearch.java
 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearch.java
index e39e53a..acb7fda 100644
--- 
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearch.java
+++ 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearch.java
@@ -31,7 +31,7 @@ import com.linecorp.armeria.client.retry.RetryingClient;
 import com.linecorp.armeria.common.HttpData;
 import com.linecorp.armeria.common.HttpStatus;
 import com.linecorp.armeria.common.SessionProtocol;
-import com.linecorp.armeria.common.auth.BasicToken;
+import com.linecorp.armeria.common.auth.AuthToken;
 import com.linecorp.armeria.common.util.Exceptions;
 import java.io.Closeable;
 import java.io.IOException;
@@ -39,7 +39,6 @@ import java.io.InputStream;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 import lombok.Getter;
@@ -51,7 +50,9 @@ import 
org.apache.skywalking.library.elasticsearch.client.DocumentClient;
 import org.apache.skywalking.library.elasticsearch.client.IndexClient;
 import org.apache.skywalking.library.elasticsearch.client.SearchClient;
 import org.apache.skywalking.library.elasticsearch.client.TemplateClient;
+import org.apache.skywalking.library.elasticsearch.requests.search.Scroll;
 import org.apache.skywalking.library.elasticsearch.requests.search.Search;
+import 
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
 import org.apache.skywalking.library.elasticsearch.response.NodeInfo;
 import 
org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
@@ -104,7 +105,7 @@ public final class ElasticSearch implements Closeable {
                                               .maxTotalAttempts(3)
                                               .newDecorator());
         if (StringUtil.isNotBlank(username) && 
StringUtil.isNotBlank(password)) {
-            builder.auth(BasicToken.of(username, password));
+            builder.auth(AuthToken.ofBasic(username, password));
         }
         client = builder.build();
         version = new CompletableFuture<>();
@@ -170,7 +171,7 @@ public final class ElasticSearch implements Closeable {
         return aliasClient;
     }
 
-    public SearchResponse search(Search search, Map<String, ?> params, 
String... index) {
+    public SearchResponse search(Search search, SearchParams params, String... 
index) {
         return searchClient.search(search, params, index);
     }
 
@@ -178,6 +179,14 @@ public final class ElasticSearch implements Closeable {
         return search(search, null, index);
     }
 
+    public SearchResponse scroll(Duration contextRetention, String scrollId) {
+        return searchClient.scroll(
+            Scroll.builder()
+                  .contextRetention(contextRetention)
+                  .scrollId(scrollId)
+                  .build());
+    }
+
     @Override
     public void close() {
         endpointGroup.removeListener(healthyEndpointListener);
diff --git 
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/SearchClient.java
 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/SearchClient.java
index d585a2f..b442605 100644
--- 
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/SearchClient.java
+++ 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/SearchClient.java
@@ -22,13 +22,14 @@ import com.linecorp.armeria.common.HttpData;
 import com.linecorp.armeria.common.HttpStatus;
 import com.linecorp.armeria.common.util.Exceptions;
 import java.io.InputStream;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.library.elasticsearch.ElasticSearchVersion;
+import org.apache.skywalking.library.elasticsearch.requests.search.Scroll;
 import org.apache.skywalking.library.elasticsearch.requests.search.Search;
+import 
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
 import 
org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
 
 @Slf4j
@@ -40,7 +41,7 @@ public final class SearchClient {
 
     @SneakyThrows
     public SearchResponse search(Search criteria,
-                                 Map<String, ?> params,
+                                 SearchParams params,
                                  String... index) {
         final CompletableFuture<SearchResponse> future =
             version.thenCompose(
@@ -72,4 +73,33 @@ public final class SearchClient {
         });
         return future.get();
     }
+
+    @SneakyThrows
+    public SearchResponse scroll(Scroll scroll) {
+        final CompletableFuture<SearchResponse> future =
+            version.thenCompose(
+                v -> client.execute(v.requestFactory().search().scroll(scroll))
+                           .aggregate().thenApply(response -> {
+                        if (response.status() != HttpStatus.OK) {
+                            throw new RuntimeException(response.contentUtf8());
+                        }
+
+                        try (final HttpData content = response.content();
+                             final InputStream is = content.toInputStream()) {
+                            return v.codec().decode(is, SearchResponse.class);
+                        } catch (Exception e) {
+                            return Exceptions.throwUnsafely(e);
+                        }
+                    }));
+        future.whenComplete((result, exception) -> {
+            if (exception != null) {
+                log.error("Failed to scroll, request {}, {}", scroll, 
exception);
+                return;
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("Succeeded to scroll, {}", result);
+            }
+        });
+        return future.get();
+    }
 }
diff --git 
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/SearchFactory.java
 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/SearchFactory.java
index 3824a4b..51c8f5e 100644
--- 
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/SearchFactory.java
+++ 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/SearchFactory.java
@@ -18,14 +18,20 @@
 package org.apache.skywalking.library.elasticsearch.requests.factory;
 
 import com.linecorp.armeria.common.HttpRequest;
-import java.util.Map;
+import org.apache.skywalking.library.elasticsearch.requests.search.Scroll;
 import org.apache.skywalking.library.elasticsearch.requests.search.Search;
+import 
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
 
 public interface SearchFactory {
     /**
      * Returns a request to search documents.
      */
-    HttpRequest search(Search search, Map<String, ?> queryParams, String... 
index);
+    HttpRequest search(Search search, SearchParams params, String... index);
+
+    /**
+     * Returns a request to retrieve the next batch of results for a scrolling 
search.
+     */
+    HttpRequest scroll(Scroll scroll);
 
     /**
      * Returns a request to search documents.
diff --git 
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/common/CommonSearchFactory.java
 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/common/CommonSearchFactory.java
index 2b4ff04..4ca7689 100644
--- 
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/common/CommonSearchFactory.java
+++ 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/common/CommonSearchFactory.java
@@ -20,13 +20,14 @@ package 
org.apache.skywalking.library.elasticsearch.requests.factory.common;
 import com.linecorp.armeria.common.HttpRequest;
 import com.linecorp.armeria.common.HttpRequestBuilder;
 import com.linecorp.armeria.common.MediaType;
-import java.util.Map;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.library.elasticsearch.ElasticSearchVersion;
 import 
org.apache.skywalking.library.elasticsearch.requests.factory.SearchFactory;
+import org.apache.skywalking.library.elasticsearch.requests.search.Scroll;
 import org.apache.skywalking.library.elasticsearch.requests.search.Search;
+import 
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
 
 @Slf4j
 @RequiredArgsConstructor
@@ -36,7 +37,7 @@ public final class CommonSearchFactory implements 
SearchFactory {
     @SneakyThrows
     @Override
     public HttpRequest search(Search search,
-                              Map<String, ?> queryParams,
+                              SearchParams params,
                               String... indices) {
         final HttpRequestBuilder builder = HttpRequest.builder();
 
@@ -47,8 +48,8 @@ public final class CommonSearchFactory implements 
SearchFactory {
                    .pathParam("indices", String.join(",", indices));
         }
 
-        if (queryParams != null && !queryParams.isEmpty()) {
-            queryParams.forEach(builder::queryParam);
+        if (params != null) {
+            params.forEach(e -> builder.queryParam(e.getKey(), e.getValue()));
         }
 
         final byte[] content = version.codec().encode(search);
@@ -60,4 +61,18 @@ public final class CommonSearchFactory implements 
SearchFactory {
         return builder.content(MediaType.JSON, content)
                       .build();
     }
+
+    @SneakyThrows
+    @Override
+    public HttpRequest scroll(Scroll scroll) {
+        final HttpRequestBuilder builder = 
HttpRequest.builder().get("/_search/scroll");
+
+        final byte[] content = version.codec().encode(scroll);
+
+        if (log.isDebugEnabled()) {
+            log.debug("Scroll request: {}", new String(content));
+        }
+
+        return builder.content(MediaType.JSON, content).build();
+    }
 }
diff --git 
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/Scroll.java
 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/Scroll.java
new file mode 100644
index 0000000..6cd7411
--- /dev/null
+++ 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/Scroll.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.skywalking.library.elasticsearch.requests.search;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+@Getter
+@RequiredArgsConstructor(access = AccessLevel.PACKAGE)
+public final class Scroll {
+    private final String scroll;
+    @JsonProperty("scroll_id")
+    private final String scrollId;
+
+    public static ScrollBuilder builder() {
+        return new ScrollBuilder();
+    }
+}
diff --git 
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/ScrollBuilder.java
 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/ScrollBuilder.java
new file mode 100644
index 0000000..1ac1e1a
--- /dev/null
+++ 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/ScrollBuilder.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.skywalking.library.elasticsearch.requests.search;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import java.time.Duration;
+import com.google.common.base.Strings;
+
+public final class ScrollBuilder {
+    private Duration contextRetention;
+    private String scrollId;
+
+    public ScrollBuilder contextRetention(Duration contextRetention) {
+        checkArgument(
+            contextRetention != null && !contextRetention.isNegative()
+                && !contextRetention.isZero(),
+            "contextRetention must be positive, but was %s", contextRetention);
+        this.contextRetention = contextRetention;
+        return this;
+    }
+
+    public ScrollBuilder scrollId(String scrollId) {
+        checkArgument(!Strings.isNullOrEmpty(scrollId), "scrollId cannot be 
blank");
+        this.scrollId = scrollId;
+        return this;
+    }
+
+    public Scroll build() {
+        checkState(contextRetention != null, "contextRetention is not set");
+        checkState(scrollId != null, "scrollId is not set");
+        return new Scroll(contextRetention.getSeconds() + "s", scrollId);
+    }
+}
diff --git 
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/SearchParams.java
 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/SearchParams.java
new file mode 100644
index 0000000..26a9715
--- /dev/null
+++ 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/SearchParams.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.skywalking.library.elasticsearch.requests.search;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public final class SearchParams implements Iterable<Map.Entry<String, Object>> 
{
+    private static final String IGNORE_UNAVAILABLE = "ignore_unavailable";
+    private static final String ALLOW_NO_INDICES = "allow_no_indices";
+    private static final String EXPAND_WILDCARDS = "expand_wildcards";
+    private static final String SCROLL = "scroll";
+
+    private final Map<String, Object> params = new HashMap<>();
+
+    public SearchParams ignoreUnavailable(boolean ignoreUnavailable) {
+        params.put(IGNORE_UNAVAILABLE, ignoreUnavailable);
+        return this;
+    }
+
+    public SearchParams allowNoIndices(boolean allowNoIndices) {
+        params.put(ALLOW_NO_INDICES, allowNoIndices);
+        return this;
+    }
+
+    public SearchParams expandWildcards(String wildcards) {
+        params.put(EXPAND_WILDCARDS, wildcards);
+        return this;
+    }
+
+    public SearchParams scroll(Duration contextRetention) {
+        checkArgument(
+            contextRetention != null && !contextRetention.isNegative()
+                && !contextRetention.isZero(),
+            "contextRetention must be positive, but was %s",
+            contextRetention);
+        params.put(SCROLL, contextRetention.getSeconds() + "s");
+        return this;
+    }
+
+    @Override
+    public Iterator<Entry<String, Object>> iterator() {
+        return params.entrySet().iterator();
+    }
+}
diff --git 
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/search/SearchResponse.java
 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/search/SearchResponse.java
index 270ac22..746c0b1 100644
--- 
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/search/SearchResponse.java
+++ 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/search/SearchResponse.java
@@ -18,6 +18,7 @@
 package org.apache.skywalking.library.elasticsearch.response.search;
 
 import java.util.Map;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import lombok.Getter;
 import lombok.Setter;
 
@@ -26,4 +27,6 @@ import lombok.Setter;
 public final class SearchResponse {
     private SearchHits hits = new SearchHits();
     private Map<String, Object> aggregations;
+    @JsonProperty("_scroll_id")
+    private String scrollId;
 }
diff --git a/oap-server/server-starter/src/main/resources/application.yml 
b/oap-server/server-starter/src/main/resources/application.yml
index 8cd9909..08970d9 100755
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -156,7 +156,8 @@ storage:
     flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:15}
     concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of 
concurrent requests
     resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
-    metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
+    metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:10000}
+    scrollingBatchSize: ${SW_STORAGE_ES_SCROLLING_BATCH_SIZE:5000}
     segmentQueryMaxSize: ${SW_STORAGE_ES_QUERY_SEGMENT_SIZE:200}
     profileTaskQueryMaxSize: ${SW_STORAGE_ES_QUERY_PROFILE_TASK_SIZE:200}
     oapAnalyzer: 
${SW_STORAGE_ES_OAP_ANALYZER:"{\"analyzer\":{\"oap_analyzer\":{\"type\":\"stop\"}}}"}
 # the oap analyzer.
@@ -509,4 +510,4 @@ receiver-event:
 
 receiver-ebpf:
   selector: ${SW_RECEIVER_EBPF:default}
-  default:
\ No newline at end of file
+  default:
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index 47a03d5..fba9f1b 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -108,6 +108,12 @@ public class StorageModuleElasticsearchConfig extends 
ModuleConfig {
     private String trustStorePass;
     private int resultWindowMaxSize = 10000;
     private int metadataQueryMaxSize = 5000;
+    /**
+     * @since 9.0.0 The batch size that is used to scroll on the large results,
+     * if {@link #metadataQueryMaxSize} is larger than the maximum result 
window in
+     * ElasticSearch server, this can be used to retrieve all results.
+     */
+    private int scrollingBatchSize = 5000;
     private int segmentQueryMaxSize = 200;
     private int profileTaskQueryMaxSize = 200;
     /**
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index b58d240..c48e3aa 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -168,15 +168,13 @@ public class StorageModuleElasticsearchProvider extends 
ModuleProvider {
         this.registerServiceImplementation(
             IHistoryDeleteDAO.class, new 
HistoryDeleteEsDAO(elasticSearchClient));
         this.registerServiceImplementation(
-            INetworkAddressAliasDAO.class, new 
NetworkAddressAliasEsDAO(elasticSearchClient, config
-                .getResultWindowMaxSize()));
+            INetworkAddressAliasDAO.class, new 
NetworkAddressAliasEsDAO(elasticSearchClient, config));
         this.registerServiceImplementation(ITopologyQueryDAO.class, new 
TopologyQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(IMetricsQueryDAO.class, new 
MetricsQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(
             ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient, 
config.getSegmentQueryMaxSize()));
         this.registerServiceImplementation(IBrowserLogQueryDAO.class, new 
BrowserLogQueryEsDAO(elasticSearchClient));
-        this.registerServiceImplementation(
-            IMetadataQueryDAO.class, new 
MetadataQueryEsDAO(elasticSearchClient, config.getMetadataQueryMaxSize()));
+        this.registerServiceImplementation(IMetadataQueryDAO.class, new 
MetadataQueryEsDAO(elasticSearchClient, config));
         this.registerServiceImplementation(IAggregationQueryDAO.class, new 
AggregationQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(IAlarmQueryDAO.class, new 
AlarmQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new 
TopNRecordsQueryEsDAO(elasticSearchClient));
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
index 0c4562d..ad21138 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
@@ -18,10 +18,12 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 
+import java.time.Duration;
 import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 
 public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
+    protected static final Duration SCROLL_CONTEXT_RETENTION = 
Duration.ofSeconds(30);
 
     public EsDAO(ElasticSearchClient client) {
         super(client);
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
index 5f8d1da..799f47c 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
@@ -23,20 +23,25 @@ import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.library.elasticsearch.requests.search.Query;
 import org.apache.skywalking.library.elasticsearch.requests.search.Search;
+import 
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
 import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
 import 
org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
 import 
org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
 import 
org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
 import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 
 @Slf4j
 public class NetworkAddressAliasEsDAO extends EsDAO implements 
INetworkAddressAliasDAO {
     protected final int resultWindowMaxSize;
+    protected final int scrollingBatchSize;
 
-    public NetworkAddressAliasEsDAO(ElasticSearchClient client, int 
resultWindowMaxSize) {
+    public NetworkAddressAliasEsDAO(ElasticSearchClient client,
+                                    StorageModuleElasticsearchConfig config) {
         super(client);
-        this.resultWindowMaxSize = resultWindowMaxSize;
+        this.resultWindowMaxSize = config.getResultWindowMaxSize();
+        this.scrollingBatchSize = config.getScrollingBatchSize();
     }
 
     @Override
@@ -44,19 +49,29 @@ public class NetworkAddressAliasEsDAO extends EsDAO 
implements INetworkAddressAl
         List<NetworkAddressAlias> networkAddressAliases = new ArrayList<>();
 
         try {
+            final int batchSize = Math.min(resultWindowMaxSize, 
scrollingBatchSize);
             final Search search =
                 Search.builder().query(
                           
Query.range(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET)
                                .gte(timeBucketInMinute))
-                      .size(resultWindowMaxSize)
+                      .size(batchSize)
                       .build();
-
-            final SearchResponse results =
-                getClient().search(NetworkAddressAlias.INDEX_NAME, search);
-
+            final SearchParams params = new 
SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
             final NetworkAddressAlias.Builder builder = new 
NetworkAddressAlias.Builder();
-            for (SearchHit searchHit : results.getHits()) {
-                
networkAddressAliases.add(builder.storage2Entity(searchHit.getSource()));
+            
+            SearchResponse results =
+                getClient().search(NetworkAddressAlias.INDEX_NAME, search, 
params);
+            while (results.getHits().getTotal() > 0) {
+                for (SearchHit searchHit : results.getHits()) {
+                    
networkAddressAliases.add(builder.storage2Entity(searchHit.getSource()));
+                }
+                if (results.getHits().getTotal() < batchSize) {
+                    break;
+                }
+                if (networkAddressAliases.size() >= resultWindowMaxSize) {
+                    break;
+                }
+                results = getClient().scroll(SCROLL_CONTEXT_RETENTION, 
results.getScrollId());
             }
         } catch (Throwable t) {
             log.error(t.getMessage(), t);
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
index 39ccf47..e7e449c 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
@@ -29,6 +29,7 @@ import 
org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuil
 import org.apache.skywalking.library.elasticsearch.requests.search.Query;
 import org.apache.skywalking.library.elasticsearch.requests.search.Search;
 import 
org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
+import 
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
 import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
 import 
org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
 import org.apache.skywalking.oap.server.core.analysis.Layer;
@@ -44,6 +45,7 @@ import 
org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
 import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
 import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
+import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
@@ -52,10 +54,14 @@ import static 
org.apache.skywalking.oap.server.core.analysis.manual.instance.Ins
 
 public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
     private final int queryMaxSize;
+    private final int scrollingBatchSize;
 
-    public MetadataQueryEsDAO(ElasticSearchClient client, int queryMaxSize) {
+    public MetadataQueryEsDAO(
+        ElasticSearchClient client,
+        StorageModuleElasticsearchConfig config) {
         super(client);
-        this.queryMaxSize = queryMaxSize;
+        this.queryMaxSize = config.getMetadataQueryMaxSize();
+        this.scrollingBatchSize = config.getScrollingBatchSize();
     }
 
     @Override
@@ -63,18 +69,34 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
         final String index =
             
IndexController.LogicIndicesRegister.getPhysicalTableName(ServiceTraffic.INDEX_NAME);
 
+        final int batchSize = Math.min(queryMaxSize, scrollingBatchSize);
         final BoolQueryBuilder query =
             Query.bool();
-        final SearchBuilder search = 
Search.builder().query(query).size(queryMaxSize);
+        final SearchBuilder search = 
Search.builder().query(query).size(batchSize);
         if (StringUtil.isNotEmpty(layer)) {
             query.must(Query.term(ServiceTraffic.LAYER, 
Layer.valueOf(layer).value()));
         }
         if (StringUtil.isNotEmpty(group)) {
             query.must(Query.term(ServiceTraffic.GROUP, group));
         }
-        final SearchResponse results = getClient().search(index, 
search.build());
-
-        return buildServices(results);
+        final SearchParams params = new 
SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
+        final List<Service> services = new ArrayList<>();
+
+        SearchResponse results = getClient().search(index, search.build(), 
params);
+        while (results.getHits().getTotal() > 0) {
+            final List<Service> batch = buildServices(results);
+            services.addAll(batch);
+            // The last iterate, there is no more data
+            if (batch.size() < batchSize) {
+                break;
+            }
+            // We've got enough data
+            if (services.size() >= queryMaxSize) {
+                break;
+            }
+            results = getClient().scroll(SCROLL_CONTEXT_RETENTION, 
results.getScrollId());
+        }
+        return services;
     }
 
     @Override
@@ -101,10 +123,23 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
             Query.bool()
                  
.must(Query.range(InstanceTraffic.LAST_PING_TIME_BUCKET).gte(minuteTimeBucket))
                  .must(Query.term(InstanceTraffic.SERVICE_ID, serviceId));
-        final SearchBuilder search = 
Search.builder().query(query).size(queryMaxSize);
-
-        final SearchResponse response = getClient().search(index, 
search.build());
-        return buildInstances(response);
+        final int batchSize = Math.min(queryMaxSize, scrollingBatchSize);
+        final SearchBuilder search = 
Search.builder().query(query).size(batchSize);
+
+        final List<ServiceInstance> instances = new ArrayList<>();
+        SearchResponse response = getClient().search(index, search.build());
+        while (response.getHits().getTotal() > 0) {
+            final List<ServiceInstance> batch = buildInstances(response);
+            instances.addAll(batch);
+            if (batch.size() < batchSize) {
+                break;
+            }
+            if (batch.size() >= queryMaxSize) {
+                break;
+            }
+            response = getClient().scroll(SCROLL_CONTEXT_RETENTION, 
response.getScrollId());
+        }
+        return instances;
     }
 
     @Override

Reply via email to