This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch es-bulk
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 304c5660f409a30ee7d7d37114ca5ea547bf3c53
Author: Wu Sheng <[email protected]>
AuthorDate: Thu Dec 8 19:59:06 2022 +0800

    Optimize `flushInterval` of ElasticSearch BulkProcessor to avoid extra 
periodical flush in continuous bulk stream.
---
 docs/en/changes/changes.md                           |  1 +
 .../library/elasticsearch/bulk/BulkProcessor.java    | 20 ++++++++++++++++++--
 2 files changed, 19 insertions(+), 2 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 3ec5e64dac..e311e153fc 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -33,6 +33,7 @@
 * Remove the dependency of `refresh_interval` of ElasticSearch indices from 
`elasticsearch/flushInterval` config. Now,
   it uses `core/persistentPeriod` + 5s as `refresh_interval` for all indices 
instead.
 * Change `elasticsearch/flushInterval` to 5s(was 15s).
+* Optimize `flushInterval` of ElasticSearch BulkProcessor to avoid extra 
periodical flush in continuous bulk stream. 
 
 #### UI
 
diff --git 
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
index 6841cb5fb6..eae56c39b5 100644
--- 
a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
+++ 
b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
@@ -48,6 +48,8 @@ public final class BulkProcessor {
     private final AtomicReference<ElasticSearch> es;
     private final int bulkActions;
     private final Semaphore semaphore;
+    private final long flushInternalInMillis;
+    private volatile long lastFlushTS = 0;
 
     public static BulkProcessorBuilder builder() {
         return new BulkProcessorBuilder();
@@ -72,9 +74,12 @@ public final class BulkProcessor {
         scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
         scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
         scheduler.setRemoveOnCancelPolicy(true);
+        flushInternalInMillis = flushInterval.getSeconds() * 1000;
         scheduler.scheduleWithFixedDelay(
-                new RunnableWithExceptionProtection(this::flush,
-                        t -> log.error("flush data to ES failure:", t)), 0, 
flushInterval.getSeconds(), TimeUnit.SECONDS);
+            new RunnableWithExceptionProtection(
+                this::doPeriodicalFlush,
+                t -> log.error("flush data to ES failure:", t)
+            ), 0, flushInterval.getSeconds(), TimeUnit.SECONDS);
     }
 
     public CompletableFuture<Void> add(IndexRequest request) {
@@ -101,6 +106,15 @@ public final class BulkProcessor {
         }
     }
 
+    private void doPeriodicalFlush() {
+        if (System.currentTimeMillis() - lastFlushTS > flushInternalInMillis / 
2) {
+            // Run periodical flush if there is no `flushIfNeeded` executed in 
the second half of the flush period.
+            // Otherwise, wait for next round. By default, last 2 seconds of 
5s period.
+            // This could avoid periodical flush running among 
bulks(controlled by bulkActions).
+            flush();
+        }
+    }
+
     public void flush() {
         if (requests.isEmpty()) {
             return;
@@ -113,6 +127,8 @@ public final class BulkProcessor {
             return;
         }
 
+        lastFlushTS = System.currentTimeMillis();
+
         final List<Holder> batch = new ArrayList<>(requests.size());
         requests.drainTo(batch);
 

Reply via email to