This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch enhance/socketTimeout
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 2a2bbfd035cc2bf03ffa25e19cce4a3f9228aa23
Author: kezhenxu94 <kezhenx...@apache.org>
AuthorDate: Fri Sep 24 16:03:15 2021 +0800

    Add `socketTimeout` back to the new implementation
---
 .../library/client/elasticsearch/ElasticSearchClient.java      |  1 +
 .../skywalking/library/elasticsearch/ElasticSearchBuilder.java | 10 ++++++++++
 .../skywalking/library/elasticsearch/bulk/BulkProcessor.java   |  7 ++++++-
 3 files changed, 17 insertions(+), 1 deletion(-)

diff --git 
a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
 
b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index b6801bf..03a9116 100644
--- 
a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ 
b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -117,6 +117,7 @@ public class ElasticSearchClient implements Client, 
HealthCheckable {
                 .endpoints(clusterNodes.split(","))
                 .protocol(protocol)
                 .connectTimeout(connectTimeout)
+                .socketTimeout(socketTimeout)
                 .numHttpClientThread(numHttpClientThread)
                 .healthyListener(healthy -> {
                     if (healthy) {
diff --git 
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchBuilder.java
 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchBuilder.java
index d154f32..2566554 100644
--- 
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchBuilder.java
+++ 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchBuilder.java
@@ -63,6 +63,8 @@ public final class ElasticSearchBuilder {
 
     private Duration connectTimeout = Duration.ofMillis(500);
 
+    private Duration socketTimeout = Duration.ofSeconds(30);
+
     private Consumer<Boolean> healthyListener;
 
     private int numHttpClientThread;
@@ -117,6 +119,13 @@ public final class ElasticSearchBuilder {
         return this;
     }
 
+
+    public ElasticSearchBuilder socketTimeout(int socketTimeout) {
+        checkArgument(socketTimeout > 0, "socketTimeout must be positive");
+        this.socketTimeout = Duration.ofMillis(socketTimeout);
+        return this;
+    }
+
     public ElasticSearchBuilder healthyListener(Consumer<Boolean> 
healthyListener) {
         requireNonNull(healthyListener, "healthyListener");
         this.healthyListener = healthyListener;
@@ -138,6 +147,7 @@ public final class ElasticSearchBuilder {
         final ClientFactoryBuilder factoryBuilder =
             ClientFactory.builder()
                          .connectTimeout(connectTimeout)
+                         .idleTimeout(socketTimeout)
                          .useHttp2Preface(false)
                          .workerGroup(numHttpClientThread > 0 ? 
numHttpClientThread : NUM_PROC);
 
diff --git 
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
index a8f72ec..93c49ae 100644
--- 
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
+++ 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
@@ -84,9 +84,10 @@ public final class BulkProcessor {
         return this;
     }
 
+    @SneakyThrows
     private void internalAdd(Object request) {
         requireNonNull(request, "request");
-        requests.add(request);
+        requests.put(request);
         flushIfNeeded();
     }
 
@@ -120,6 +121,10 @@ public final class BulkProcessor {
     private CompletableFuture<Void> doFlush(final List<Object> batch) {
         log.debug("Executing bulk with {} requests", batch.size());
 
+        if (batch.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
         final CompletableFuture<Void> future = 
es.get().version().thenCompose(v -> {
             try {
                 final RequestFactory rf = v.requestFactory();

Reply via email to