This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch es-bulk in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 304c5660f409a30ee7d7d37114ca5ea547bf3c53 Author: Wu Sheng <[email protected]> AuthorDate: Thu Dec 8 19:59:06 2022 +0800 Optimize `flushInterval` of ElasticSearch BulkProcessor to avoid extra periodical flush in continuous bulk stream. --- docs/en/changes/changes.md | 1 + .../library/elasticsearch/bulk/BulkProcessor.java | 20 ++++++++++++++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 3ec5e64dac..e311e153fc 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -33,6 +33,7 @@ * Remove the dependency of `refresh_interval` of ElasticSearch indices from `elasticsearch/flushInterval` config. Now, it uses `core/persistentPeriod` + 5s as `refresh_interval` for all indices instead. * Change `elasticsearch/flushInterval` to 5s(was 15s). +* Optimize `flushInterval` of ElasticSearch BulkProcessor to avoid extra periodical flush in continuous bulk stream. #### UI diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java index 6841cb5fb6..eae56c39b5 100644 --- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java @@ -48,6 +48,8 @@ public final class BulkProcessor { private final AtomicReference<ElasticSearch> es; private final int bulkActions; private final Semaphore semaphore; + private final long flushInternalInMillis; + private volatile long lastFlushTS = 0; public static BulkProcessorBuilder builder() { return new BulkProcessorBuilder(); @@ -72,9 +74,12 @@ public final class BulkProcessor { scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); scheduler.setRemoveOnCancelPolicy(true); + flushInternalInMillis = flushInterval.getSeconds() * 1000; scheduler.scheduleWithFixedDelay( - new RunnableWithExceptionProtection(this::flush, - t -> log.error("flush data to ES failure:", t)), 0, flushInterval.getSeconds(), TimeUnit.SECONDS); + new RunnableWithExceptionProtection( + this::doPeriodicalFlush, + t -> log.error("flush data to ES failure:", t) + ), 0, flushInterval.getSeconds(), TimeUnit.SECONDS); } public CompletableFuture<Void> add(IndexRequest request) { @@ -101,6 +106,15 @@ public final class BulkProcessor { } } + private void doPeriodicalFlush() { + if (System.currentTimeMillis() - lastFlushTS > flushInternalInMillis / 2) { + // Run periodical flush if there is no `flushIfNeeded` executed in the second half of the flush period. + // Otherwise, wait for next round. By default, last 2 seconds of 5s period. + // This could avoid periodical flush running among bulks(controlled by bulkActions). + flush(); + } + } + public void flush() { if (requests.isEmpty()) { return; @@ -113,6 +127,8 @@ public final class BulkProcessor { return; } + lastFlushTS = System.currentTimeMillis(); + final List<Holder> batch = new ArrayList<>(requests.size()); requests.drainTo(batch);
