This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 174b8d6ef0 Enhance #9437 fix to avoid too many small flushing. (#9440)
174b8d6ef0 is described below
commit 174b8d6ef06b8929eba42d961d6ac6e18a8dab9c
Author: 吴晟 Wu Sheng <[email protected]>
AuthorDate: Tue Aug 9 07:28:17 2022 +0800
Enhance #9437 fix to avoid too many small flushing. (#9440)
Move the forcedly flush out of each worker's #flush, which would only make
the left data flushing into the storage ASAP. In my previous fix, it would
increase the number of flushing as the number of workers, which is unnecessary
too.
---
.../oap/server/core/storage/IBatchDAO.java | 10 +++++++++
.../oap/server/core/storage/PersistenceTimer.java | 1 +
.../elasticsearch/base/BatchProcessEsDAO.java | 25 +++++++++++-----------
3 files changed, 24 insertions(+), 12 deletions(-)
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
index f7279f276a..139bce395b 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
@@ -45,4 +45,14 @@ public interface IBatchDAO extends DAO {
* @param prepareRequests data to insert or update. No delete happens in
streaming mode.
*/
CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests);
+
+ /**
+ * End of flush is an event to notify the whole flush period is ending.
+ * This provides a time point to do clean up works.
+ *
+ * @since 9.2.0
+ */
+ default void endOfFlush() {
+
+ }
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index 3af6a780bb..6bf7ed779c 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -136,6 +136,7 @@ public enum PersistenceTimer {
.whenComplete(($1, $2) ->
executeLatencyTimer.close());
}, prepareExecutorService);
}).toArray(CompletableFuture[]::new));
+ batchDAO.endOfFlush();
future.whenComplete((unused, throwable) -> {
allTimer.close();
if (log.isDebugEnabled()) {
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
index 1a4aaa52ac..cb8bdce62f 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
@@ -73,19 +73,20 @@ public class BatchProcessEsDAO extends EsDAO implements
IBatchDAO {
}
if (CollectionUtils.isNotEmpty(prepareRequests)) {
- try {
- return
CompletableFuture.allOf(prepareRequests.stream().map(prepareRequest -> {
- if (prepareRequest instanceof InsertRequest) {
- return bulkProcessor.add(((IndexRequestWrapper)
prepareRequest).getRequest());
- } else {
- return bulkProcessor.add(((UpdateRequestWrapper)
prepareRequest).getRequest());
- }
- }).toArray(CompletableFuture[]::new));
- } finally {
- // Flush forcedly due to this kind of metrics has been pushed
into the bulk processor.
- bulkProcessor.flush();
- }
+ return
CompletableFuture.allOf(prepareRequests.stream().map(prepareRequest -> {
+ if (prepareRequest instanceof InsertRequest) {
+ return bulkProcessor.add(((IndexRequestWrapper)
prepareRequest).getRequest());
+ } else {
+ return bulkProcessor.add(((UpdateRequestWrapper)
prepareRequest).getRequest());
+ }
+ }).toArray(CompletableFuture[]::new));
}
return CompletableFuture.completedFuture(null);
}
+
+ @Override
+ public void endOfFlush() {
+ // Flush forcedly due to this kind of metrics has been pushed into the
bulk processor.
+ bulkProcessor.flush();
+ }
}