This is an automated email from the ASF dual-hosted git repository.
gongchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/master by this push:
new 810d54bef [improve] Optimize the scheduling logic for batch flush
tasks (#3660)
810d54bef is described below
commit 810d54bef561c4e8720622691383675f94df9092
Author: Cyanty <[email protected]>
AuthorDate: Sun Aug 17 14:24:48 2025 +0800
[improve] Optimize the scheduling logic for batch flush tasks (#3660)
Signed-off-by: Cyanty <[email protected]>
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Sherlock Yin <[email protected]>
Co-authored-by: Calvin <[email protected]>
---
.../tsdb/vm/VictoriaMetricsDataStorage.java | 56 +++++++---
.../tsdb/vm/VictoriaMetricsDataStorageTest.java | 116 +++++++++++++--------
2 files changed, 112 insertions(+), 60 deletions(-)
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorage.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorage.java
index 1736b7f2f..ab304497b 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorage.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorage.java
@@ -36,6 +36,7 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.GZIPOutputStream;
import com.google.common.collect.Maps;
@@ -106,8 +107,8 @@ public class VictoriaMetricsDataStorage extends
AbstractHistoryDataStorage {
private final
BlockingQueue<VictoriaMetricsDataStorage.VictoriaMetricsContent>
metricsBufferQueue;
private HashedWheelTimer metricsFlushTimer = null;
- private MetricsFlushTask metricsFlushtask = null;
private final VictoriaMetricsProperties.InsertConfig insertConfig;
+ private final AtomicBoolean draining = new AtomicBoolean(false);
public VictoriaMetricsDataStorage(VictoriaMetricsProperties
victoriaMetricsProperties, RestTemplate restTemplate) {
if (victoriaMetricsProperties == null) {
@@ -129,8 +130,8 @@ public class VictoriaMetricsDataStorage extends
AbstractHistoryDataStorage {
thread.setDaemon(true);
return thread;
}, 1, TimeUnit.SECONDS, 512);
- metricsFlushtask = new MetricsFlushTask();
- this.metricsFlushTimer.newTimeout(metricsFlushtask, 0,
TimeUnit.SECONDS);
+ // start flush interval timer
+ this.metricsFlushTimer.newTimeout(new MetricsFlushTask(null),
insertConfig.flushInterval(), TimeUnit.SECONDS);
}
private boolean checkVictoriaMetricsDatasourceAvailable() {
@@ -591,36 +592,63 @@ public class VictoriaMetricsDataStorage extends
AbstractHistoryDataStorage {
}
}
// Refresh in advance to avoid waiting
- if (metricsBufferQueue.size() >= insertConfig.bufferSize() * 0.8) {
+ if (metricsBufferQueue.size() >= insertConfig.bufferSize() * 0.8
+ && draining.compareAndSet(false, true)) {
triggerImmediateFlush();
}
}
private void triggerImmediateFlush() {
- metricsFlushTimer.newTimeout(metricsFlushtask, 0,
TimeUnit.MILLISECONDS);
+ List<VictoriaMetricsDataStorage.VictoriaMetricsContent> batch = new
ArrayList<>(insertConfig.bufferSize());
+ metricsBufferQueue.drainTo(batch, insertConfig.bufferSize());
+ draining.set(false);
+ if (!batch.isEmpty()) {
+ metricsFlushTimer.newTimeout(new MetricsFlushTask(batch), 0,
TimeUnit.MILLISECONDS);
+ }
}
/**
* Regularly refresh the buffer queue to the vm
*/
private class MetricsFlushTask implements TimerTask {
+ private final List<VictoriaMetricsDataStorage.VictoriaMetricsContent>
batch;
+
+ public
MetricsFlushTask(List<VictoriaMetricsDataStorage.VictoriaMetricsContent> batch)
{
+ this.batch = batch;
+ }
+
@Override
public void run(Timeout timeout) {
try {
- List<VictoriaMetricsDataStorage.VictoriaMetricsContent> batch
= new ArrayList<>(insertConfig.bufferSize());
- metricsBufferQueue.drainTo(batch, insertConfig.bufferSize());
- if (!batch.isEmpty()) {
- doSaveData(batch);
- log.debug("[Victoria Metrics] Flushed {} metrics items",
batch.size());
- }
- if (metricsFlushTimer != null && !metricsFlushTimer.isStop()) {
- metricsFlushTimer.newTimeout(this,
insertConfig.flushInterval(), TimeUnit.SECONDS);
- log.debug("[Victoria Metrics] Rescheduled next flush task
in {} seconds.", insertConfig.flushInterval());
+ if (batch == null) {
+ // If the batch is null, it means that the timer is
triggered by flush interval timer
+ List<VictoriaMetricsDataStorage.VictoriaMetricsContent>
batchT = new ArrayList<>(insertConfig.bufferSize());
+ metricsBufferQueue.drainTo(batchT,
insertConfig.bufferSize());
+ triggerDoSaveData(batchT);
+ // Reschedule the next flush task
+ triggerIntervalFlushTimer();
+ } else {
+ // If the batch is not null, it means that the timer is
triggered by the immediate flush
+ triggerDoSaveData(batch);
}
} catch (Exception e) {
log.error("[VictoriaMetrics] flush task error: {}",
e.getMessage(), e);
}
}
+
+ private void triggerDoSaveData(List<VictoriaMetricsContent> batch) {
+ if (!batch.isEmpty()) {
+ doSaveData(batch);
+ log.debug("[Victoria Metrics] Flushed {} metrics items",
batch.size());
+ }
+ }
+
+ private void triggerIntervalFlushTimer() {
+ if (metricsFlushTimer != null && !metricsFlushTimer.isStop()) {
+ metricsFlushTimer.newTimeout(new MetricsFlushTask(null),
insertConfig.flushInterval(), TimeUnit.SECONDS);
+ log.debug("[Victoria Metrics] Rescheduled next flush task in
{} seconds.", insertConfig.flushInterval());
+ }
+ }
}
/**
diff --git
a/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorageTest.java
b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorageTest.java
index 37957499e..f11fdb905 100644
---
a/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorageTest.java
+++
b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorageTest.java
@@ -21,9 +21,8 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.startsWith;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.times;
+import static org.assertj.core.api.Assertions.assertThat;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
@@ -53,6 +52,7 @@ import org.springframework.web.client.RestTemplate;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Test case for {@link VictoriaMetricsDataStorage}
@@ -72,18 +72,33 @@ class VictoriaMetricsDataStorageTest {
private VictoriaMetricsDataStorage victoriaMetricsDataStorage;
+ private final AtomicInteger postForEntityCount = new AtomicInteger(0);
+
@BeforeEach
void setUp() {
when(victoriaMetricsProperties.enabled()).thenReturn(true);
when(victoriaMetricsProperties.url()).thenReturn("http://localhost:8428");
when(victoriaMetricsProperties.username()).thenReturn("root");
when(victoriaMetricsProperties.password()).thenReturn("root");
+
// on successful write, VictoriaMetrics returns HTTP 204 (No Content)
when(responseEntity.getStatusCode()).thenReturn(HttpStatus.NO_CONTENT);
- when(restTemplate.exchange(anyString(), eq(HttpMethod.GET),
any(HttpEntity.class), eq(String.class)))
- .thenReturn(responseEntity);
- when(restTemplate.postForEntity(anyString(), any(HttpEntity.class),
eq(String.class)))
- .thenReturn(responseEntity);
+
+ when(restTemplate.exchange(
+ anyString(),
+ eq(HttpMethod.GET),
+ any(HttpEntity.class),
+ eq(String.class)
+ )).thenReturn(responseEntity);
+
+ when(restTemplate.postForEntity(
+ startsWith(victoriaMetricsProperties.url()),
+ any(HttpEntity.class),
+ eq(String.class)
+ )).thenAnswer(invocation -> {
+ postForEntityCount.incrementAndGet();
+ return responseEntity;
+ });
}
@Test
@@ -92,14 +107,11 @@ class VictoriaMetricsDataStorageTest {
victoriaMetricsDataStorage = new
VictoriaMetricsDataStorage(victoriaMetricsProperties, restTemplate);
// execute one-time data insertion
victoriaMetricsDataStorage.saveData(generateMockedMetricsData());
- // wait for the timer's first insertion task execution and verify if
it was called once
- Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
- verify(restTemplate, times(1)).postForEntity(
- startsWith(victoriaMetricsProperties.url()),
- any(HttpEntity.class),
- eq(String.class)
- )
- );
+ // wait for the timer's first insertion task execution and verify if
it was called once (default 3 seconds)
+ Awaitility.await()
+ .pollInterval(2, TimeUnit.SECONDS)
+ .atMost(7, TimeUnit.SECONDS)
+ .untilAsserted(() ->
assertThat(postForEntityCount.get()).isEqualTo(1));
}
@Test
@@ -109,28 +121,15 @@ class VictoriaMetricsDataStorageTest {
10, Integer.MAX_VALUE, new
VictoriaMetricsProperties.Compression(false)));
victoriaMetricsDataStorage = new
VictoriaMetricsDataStorage(victoriaMetricsProperties, restTemplate);
- victoriaMetricsDataStorage.saveData(generateMockedMetricsData());
- // wait for the timer to execute its first insertion task
- Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
- verify(restTemplate, times(1)).postForEntity(
- startsWith(victoriaMetricsProperties.url()),
- any(HttpEntity.class),
- eq(String.class)
- )
- );
-
// triggers the buffer size insertion condition
for (int i = 0; i < 10 * 0.8; i++) {
victoriaMetricsDataStorage.saveData(generateMockedMetricsData());
}
- // wait for the timer to execute the task again
- Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
- verify(restTemplate, times(2)).postForEntity(
- startsWith(victoriaMetricsProperties.url()),
- any(HttpEntity.class),
- eq(String.class)
- )
- );
+ // wait for the timer to execute the task
+ Awaitility.await()
+ .pollInterval(1, TimeUnit.SECONDS)
+ .atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() ->
assertThat(postForEntityCount.get()).isEqualTo(1));
}
@Test
@@ -142,22 +141,47 @@ class VictoriaMetricsDataStorageTest {
victoriaMetricsDataStorage.saveData(generateMockedMetricsData());
// wait for the timer to execute its first insertion task
- Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
- verify(restTemplate, times(1)).postForEntity(
- startsWith(victoriaMetricsProperties.url()),
- any(HttpEntity.class),
- eq(String.class)
- )
- );
+ Awaitility.await()
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() ->
assertThat(postForEntityCount.get()).isEqualTo(1));
victoriaMetricsDataStorage.saveData(generateMockedMetricsData());
- // wait for the flush interval to be triggered
- Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
- verify(restTemplate, times(2)).postForEntity(
- startsWith(victoriaMetricsProperties.url()),
- any(),
- eq(String.class)
- ));
+ // wait for the flush interval to be triggered again
+ Awaitility.await()
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() ->
assertThat(postForEntityCount.get()).isEqualTo(2));
+ }
+
+ @Test
+ void testMultiThreadSaveDataBySize() {
+ int threadCount = 100;
+ int bufferSize = 10;
+ int writeSize = (int) (bufferSize * 0.8);
+
+ // verify insert process for buffer size, with the flush interval
defined as an unreachable state
+ when(victoriaMetricsProperties.insert()).thenReturn(new
VictoriaMetricsProperties.InsertConfig(
+ bufferSize, Integer.MAX_VALUE, new
VictoriaMetricsProperties.Compression(false)));
+ victoriaMetricsDataStorage = new
VictoriaMetricsDataStorage(victoriaMetricsProperties, restTemplate);
+
+ for (int i = 0; i < threadCount; i++) {
+ new Thread(() -> {
+ // triggers the buffer size insertion condition
+ for (int j = 0; j < writeSize; j++) {
+
victoriaMetricsDataStorage.saveData(generateMockedMetricsData());
+ }
+ }).start();
+ }
+
+ // wait for the timer to execute the task
+ Awaitility.await()
+ .pollInterval(3, TimeUnit.SECONDS)
+ .atMost(15, TimeUnit.SECONDS)
+ .untilAsserted(() ->
+ assertThat(postForEntityCount.get())
+ // minimum flushes: ensure all data is
processed (threadCount * writeSize / bufferSize)
+ .isGreaterThanOrEqualTo(threadCount *
writeSize / bufferSize));
}
@AfterEach
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]