This is an automated email from the ASF dual-hosted git repository. kezhenxu94 pushed a commit to branch enhance/socketTimeout in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 2a2bbfd035cc2bf03ffa25e19cce4a3f9228aa23 Author: kezhenxu94 <kezhenx...@apache.org> AuthorDate: Fri Sep 24 16:03:15 2021 +0800 Add `socketTimeout` back to the new implementation --- .../library/client/elasticsearch/ElasticSearchClient.java | 1 + .../skywalking/library/elasticsearch/ElasticSearchBuilder.java | 10 ++++++++++ .../skywalking/library/elasticsearch/bulk/BulkProcessor.java | 7 ++++++- 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java index b6801bf..03a9116 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java @@ -117,6 +117,7 @@ public class ElasticSearchClient implements Client, HealthCheckable { .endpoints(clusterNodes.split(",")) .protocol(protocol) .connectTimeout(connectTimeout) + .socketTimeout(socketTimeout) .numHttpClientThread(numHttpClientThread) .healthyListener(healthy -> { if (healthy) { diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchBuilder.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchBuilder.java index d154f32..2566554 100644 --- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchBuilder.java +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchBuilder.java @@ -63,6 +63,8 @@ public final class ElasticSearchBuilder { private Duration connectTimeout = Duration.ofMillis(500); + private Duration socketTimeout = Duration.ofSeconds(30); + private Consumer<Boolean> healthyListener; private int numHttpClientThread; @@ -117,6 +119,13 @@ public final class ElasticSearchBuilder { return this; } + + public ElasticSearchBuilder socketTimeout(int socketTimeout) { + checkArgument(socketTimeout > 0, "socketTimeout must be positive"); + this.socketTimeout = Duration.ofMillis(socketTimeout); + return this; + } + public ElasticSearchBuilder healthyListener(Consumer<Boolean> healthyListener) { requireNonNull(healthyListener, "healthyListener"); this.healthyListener = healthyListener; @@ -138,6 +147,7 @@ public final class ElasticSearchBuilder { final ClientFactoryBuilder factoryBuilder = ClientFactory.builder() .connectTimeout(connectTimeout) + .idleTimeout(socketTimeout) .useHttp2Preface(false) .workerGroup(numHttpClientThread > 0 ? numHttpClientThread : NUM_PROC); diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java index a8f72ec..93c49ae 100644 --- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java @@ -84,9 +84,10 @@ public final class BulkProcessor { return this; } + @SneakyThrows private void internalAdd(Object request) { requireNonNull(request, "request"); - requests.add(request); + requests.put(request); flushIfNeeded(); } @@ -120,6 +121,10 @@ public final class BulkProcessor { private CompletableFuture<Void> doFlush(final List<Object> batch) { log.debug("Executing bulk with {} requests", batch.size()); + if (batch.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + final CompletableFuture<Void> future = es.get().version().thenCompose(v -> { try { final RequestFactory rf = v.requestFactory();