This is an automated email from the ASF dual-hosted git repository.
gongchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/master by this push:
new 360abcc61d [feature] Using duckdb instead of jpa to store history data
(#3883)
360abcc61d is described below
commit 360abcc61dd0ff1376c6f4c73119de8dffe3bff8
Author: Logic <[email protected]>
AuthorDate: Thu Dec 4 23:51:04 2025 +0800
[feature] Using duckdb instead of jpa to store history data (#3883)
Signed-off-by: Logic <[email protected]>
Co-authored-by: aias00 <[email protected]>
Co-authored-by: Copilot <[email protected]>
---
.../log/alert/LogPeriodicAlertE2eTest.java | 2 +-
.../log/storage/GreptimeLogStorageE2eTest.java | 2 +-
.../src/main/resources/application-test.yml | 5 +-
.../src/main/resources/application.yml | 7 +-
hertzbeat-warehouse/pom.xml | 8 +-
.../warehouse/constants/WarehouseConstants.java | 2 +
.../tsdb/duckdb/DuckdbDatabaseDataStorage.java | 402 +++++++++++++++++++++
.../DuckdbProperties.java} | 14 +-
.../history/tsdb/jpa/JpaDatabaseDataStorage.java | 273 --------------
material/licenses/LICENSE | 1 +
pom.xml | 1 +
script/application.yml | 7 +-
12 files changed, 427 insertions(+), 297 deletions(-)
diff --git
a/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/alert/LogPeriodicAlertE2eTest.java
b/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/alert/LogPeriodicAlertE2eTest.java
index e3c387dbe6..8ad662c2b0 100644
---
a/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/alert/LogPeriodicAlertE2eTest.java
+++
b/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/alert/LogPeriodicAlertE2eTest.java
@@ -102,7 +102,7 @@ public class LogPeriodicAlertE2eTest {
@DynamicPropertySource
static void greptimeProps(DynamicPropertyRegistry r) {
// Configure GreptimeDB storage
- r.add("warehouse.store.jpa.enabled", () -> "false");
+ r.add("warehouse.store.duckdb.enabled", () -> "false");
r.add("warehouse.store.greptime.enabled", () -> "true");
r.add("warehouse.store.greptime.http-endpoint", () ->
"http://localhost:" + greptimedb.getMappedPort(GREPTIME_HTTP_PORT));
r.add("warehouse.store.greptime.grpc-endpoints", () -> "localhost:" +
greptimedb.getMappedPort(GREPTIME_GRPC_PORT));
diff --git
a/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/storage/GreptimeLogStorageE2eTest.java
b/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/storage/GreptimeLogStorageE2eTest.java
index 4aba2f1eb2..6a85ffa107 100644
---
a/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/storage/GreptimeLogStorageE2eTest.java
+++
b/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/storage/GreptimeLogStorageE2eTest.java
@@ -86,7 +86,7 @@ public class GreptimeLogStorageE2eTest {
@DynamicPropertySource
static void greptimeProps(DynamicPropertyRegistry r) {
- r.add("warehouse.store.jpa.enabled", () -> "false");
+ r.add("warehouse.store.duckdb.enabled", () -> "false");
r.add("warehouse.store.greptime.enabled", () -> "true");
r.add("warehouse.store.greptime.http-endpoint", () ->
"http://localhost:" + greptimedb.getMappedPort(GREPTIME_HTTP_PORT));
r.add("warehouse.store.greptime.grpc-endpoints", () -> "localhost:" +
greptimedb.getMappedPort(GREPTIME_GRPC_PORT));
diff --git a/hertzbeat-startup/src/main/resources/application-test.yml
b/hertzbeat-startup/src/main/resources/application-test.yml
index 4aa13715ff..029530d621 100644
--- a/hertzbeat-startup/src/main/resources/application-test.yml
+++ b/hertzbeat-startup/src/main/resources/application-test.yml
@@ -55,10 +55,9 @@ common:
warehouse:
store:
- jpa:
+ duckdb:
enabled: true
- expire-time: 1h
- max-history-record-num: 6000
+ expire-time: 90d
victoria-metrics:
enabled: false
url: http://localhost:8428
diff --git a/hertzbeat-startup/src/main/resources/application.yml
b/hertzbeat-startup/src/main/resources/application.yml
index e141b4606c..ea661edf93 100644
--- a/hertzbeat-startup/src/main/resources/application.yml
+++ b/hertzbeat-startup/src/main/resources/application.yml
@@ -169,13 +169,10 @@ common:
warehouse:
store:
# store history metrics data, enable only one below
- jpa:
+ duckdb:
enabled: true
# The maximum retention time for history records, after which records
will be deleted
- expire-time: 1h
- # The maximum number of history records retained, if this number is
exceeded, half of the data in this configuration item will be deleted
- # (please set this configuration reasonably as history records can
affect performance when it is large)
- max-history-record-num: 6000
+ expire-time: 90d
victoria-metrics:
# Standalone mode toggle — must be set to false when using cluster mode
enabled: false
diff --git a/hertzbeat-warehouse/pom.xml b/hertzbeat-warehouse/pom.xml
index 95a32a5012..169d014d74 100644
--- a/hertzbeat-warehouse/pom.xml
+++ b/hertzbeat-warehouse/pom.xml
@@ -150,11 +150,17 @@
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
</dependency>
-
+
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>${snappy-java.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.duckdb</groupId>
+ <artifactId>duckdb_jdbc</artifactId>
+ <version>${duckdb.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
index b646b5f9ec..ad9cfb8253 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
@@ -46,6 +46,8 @@ public interface WarehouseConstants {
String VM_CLUSTER = "victoria-metrics.cluster";
String QUEST_DB = "questdb";
+
+ String DUCKDB = "duckdb";
}
/**
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/duckdb/DuckdbDatabaseDataStorage.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/duckdb/DuckdbDatabaseDataStorage.java
new file mode 100644
index 0000000000..f8c274fb4b
--- /dev/null
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/duckdb/DuckdbDatabaseDataStorage.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.store.history.tsdb.duckdb;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.hertzbeat.common.constants.CommonConstants;
+import org.apache.hertzbeat.common.constants.MetricDataConstants;
+import org.apache.hertzbeat.common.entity.arrow.RowWrapper;
+import org.apache.hertzbeat.common.entity.dto.Value;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.hertzbeat.common.util.JsonUtil;
+import org.apache.hertzbeat.common.util.TimePeriodUtil;
+import
org.apache.hertzbeat.warehouse.store.history.tsdb.AbstractHistoryDataStorage;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.time.ZonedDateTime;
+import java.time.temporal.TemporalAmount;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * data storage by duckdb
+ */
+@Component
+@ConditionalOnProperty(prefix = "warehouse.store.duckdb", name = "enabled",
havingValue = "true")
+@Slf4j
+public class DuckdbDatabaseDataStorage extends AbstractHistoryDataStorage {
+
+ private static final String DRIVER_NAME = "org.duckdb.DuckDBDriver";
+ private static final String URL_PREFIX = "jdbc:duckdb:";
+ private static final String DEFAULT_DB_FILE = "data/history.duckdb";
+
+ // Ideal number of data points for charting (avoids frontend lag)
+ private static final int TARGET_CHART_POINTS = 800;
+
+
@org.springframework.beans.factory.annotation.Value("${warehouse.store.duckdb.expire-time:30d}")
+ private String expireTimeStr;
+
+ private final String dbPath;
+
+ public DuckdbDatabaseDataStorage() {
+ this.dbPath = DEFAULT_DB_FILE;
+ this.serverAvailable = initDuckDb();
+ if (this.serverAvailable) {
+ startExpiredDataCleaner();
+ }
+ }
+
+ private boolean initDuckDb() {
+ try {
+ Class.forName(DRIVER_NAME);
+ try (Connection connection =
DriverManager.getConnection(URL_PREFIX + dbPath);
+ Statement statement = connection.createStatement()) {
+ // instance app metrics metric metric_type int32_value
double_value str_value record_time labels
+ String createTableSql = """
+ CREATE TABLE IF NOT EXISTS hzb_history (
+ instance VARCHAR,
+ app VARCHAR,
+ metrics VARCHAR,
+ metric VARCHAR,
+ metric_type SMALLINT,
+ int32_value INTEGER,
+ double_value DOUBLE,
+ str_value VARCHAR,
+ record_time BIGINT,
+ labels VARCHAR)""";
+ statement.execute(createTableSql);
+ // Add composite index for query performance
+ String createCompositeIndexSql = "CREATE INDEX IF NOT EXISTS
idx_hzb_history_composite ON hzb_history(instance, app, metrics, metric,
record_time)";
+ statement.execute(createCompositeIndexSql);
+ // Add index for cleanup performance
+ String createRecordTimeIndexSql = "CREATE INDEX IF NOT EXISTS
idx_hzb_history_record_time ON hzb_history(record_time)";
+ statement.execute(createRecordTimeIndexSql);
+ return true;
+ }
+ } catch (Exception e) {
+ log.error("Failed to init duckdb: {}", e.getMessage(), e);
+ return false;
+ }
+ }
+
+ private void startExpiredDataCleaner() {
+ ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread thread = new Thread(r, "duckdb-cleaner");
+ thread.setDaemon(true);
+ return thread;
+ });
+ scheduledExecutor.scheduleAtFixedRate(() -> {
+ long expireTime;
+ try {
+ if (NumberUtils.isParsable(expireTimeStr)) {
+ expireTime = NumberUtils.toLong(expireTimeStr);
+ expireTime = (ZonedDateTime.now().toEpochSecond() +
expireTime) * 1000L;
+ } else {
+ TemporalAmount temporalAmount =
TimePeriodUtil.parseTokenTime(expireTimeStr);
+ ZonedDateTime dateTime =
ZonedDateTime.now().minus(temporalAmount);
+ expireTime = dateTime.toEpochSecond() * 1000L;
+ }
+ } catch (Exception e) {
+ log.error("expiredDataCleaner time error: {}. use default
expire time to clean: 30d", e.getMessage());
+ ZonedDateTime dateTime =
ZonedDateTime.now().minus(Duration.ofDays(30));
+ expireTime = dateTime.toEpochSecond() * 1000L;
+ }
+ try (Connection connection =
DriverManager.getConnection(URL_PREFIX + dbPath);
+ PreparedStatement statement =
connection.prepareStatement("DELETE FROM hzb_history WHERE record_time < ?")) {
+ statement.setLong(1, expireTime);
+ int rows = statement.executeUpdate();
+ if (rows > 0) {
+ log.info("[duckdb] delete {} expired records.", rows);
+ }
+ } catch (Exception e) {
+ log.error("[duckdb] clean expired data error: {}",
e.getMessage(), e);
+ }
+ }, 5, 60, TimeUnit.MINUTES);
+ }
+
+ @Override
+ public void saveData(CollectRep.MetricsData metricsData) {
+ if (!isServerAvailable() || metricsData.getCode() !=
CollectRep.Code.SUCCESS || metricsData.getValues().isEmpty()) {
+ return;
+ }
+
+ String monitorType = metricsData.getApp();
+ String metrics = metricsData.getMetrics();
+ String insertSql = "INSERT INTO hzb_history VALUES (?, ?, ?, ?, ?, ?,
?, ?, ?, ?)";
+
+ try (Connection connection = DriverManager.getConnection(URL_PREFIX +
dbPath);
+ PreparedStatement preparedStatement =
connection.prepareStatement(insertSql)) {
+
+ RowWrapper rowWrapper = metricsData.readRow();
+ Map<String, String> labels = new HashMap<>();
+
+ while (rowWrapper.hasNextRow()) {
+ rowWrapper = rowWrapper.nextRow();
+ long time = metricsData.getTime();
+
+ // First pass: collect labels
+ rowWrapper.cellStream().forEach(cell -> {
+ if (cell.getMetadataAsBoolean(MetricDataConstants.LABEL)) {
+ labels.put(cell.getField().getName(), cell.getValue());
+ }
+ });
+ String labelsJson = JsonUtil.toJson(labels);
+
+ // Second pass: insert data
+ rowWrapper.cellStream().forEach(cell -> {
+ try {
+ String metric = cell.getField().getName();
+ String columnValue = cell.getValue();
+ int fieldType =
cell.getMetadataAsInteger(MetricDataConstants.TYPE);
+
+ preparedStatement.setString(1,
metricsData.getInstance());
+ preparedStatement.setString(2, monitorType);
+ preparedStatement.setString(3, metrics);
+ preparedStatement.setString(4, metric);
+
+ if (CommonConstants.NULL_VALUE.equals(columnValue)) {
+ preparedStatement.setShort(5, (short)
CommonConstants.TYPE_NUMBER);
+ preparedStatement.setObject(6, null);
+ preparedStatement.setObject(7, null);
+ preparedStatement.setObject(8, null);
+ } else {
+ switch (fieldType) {
+ case CommonConstants.TYPE_STRING -> {
+ preparedStatement.setShort(5, (short)
CommonConstants.TYPE_STRING);
+ preparedStatement.setObject(6, null);
+ preparedStatement.setObject(7, null);
+ preparedStatement.setString(8,
columnValue);
+ }
+ case CommonConstants.TYPE_TIME -> {
+ preparedStatement.setShort(5, (short)
CommonConstants.TYPE_TIME);
+ try {
+ preparedStatement.setInt(6,
Integer.parseInt(columnValue));
+ } catch (NumberFormatException nfe) {
+ log.warn("Failed to parse columnValue
'{}' as integer for metric '{}'. Setting value to null.", columnValue, metric,
nfe);
+ preparedStatement.setObject(6, null);
+ }
+ preparedStatement.setObject(7, null);
+ preparedStatement.setObject(8, null);
+ }
+ default -> {
+ preparedStatement.setShort(5, (short)
CommonConstants.TYPE_NUMBER);
+ preparedStatement.setObject(6, null);
+ double v = Double.parseDouble(columnValue);
+ preparedStatement.setDouble(7, v);
+ preparedStatement.setObject(8, null);
+ }
+ }
+ }
+ preparedStatement.setLong(9, time);
+ preparedStatement.setString(10, labelsJson);
+ preparedStatement.addBatch();
+ } catch (SQLException e) {
+ log.error("error setting prepared statement", e);
+ }
+ });
+ labels.clear();
+ }
+ preparedStatement.executeBatch();
+ } catch (Exception e) {
+ log.error("[duckdb] save data error: {}", e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public Map<String, List<Value>> getHistoryMetricData(String instance,
String app, String metrics, String metric, String history) {
+ Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
+ if (!isServerAvailable()) {
+ return instanceValuesMap;
+ }
+
+ // Raw data query - limit to avoid memory issues
+ StringBuilder sqlBuilder = new StringBuilder("SELECT record_time,
metric_type, int32_value, double_value,"
+ + " str_value, labels FROM hzb_history WHERE instance = ? AND
app = ? AND metrics = ? AND metric = ?");
+
+ long timeBefore = 0;
+ if (history != null) {
+ try {
+ TemporalAmount temporalAmount =
TimePeriodUtil.parseTokenTime(history);
+ ZonedDateTime dateTime =
ZonedDateTime.now().minus(temporalAmount);
+ timeBefore = dateTime.toEpochSecond() * 1000L;
+ sqlBuilder.append(" AND record_time >= ?");
+ } catch (Exception e) {
+ log.error("parse history time error: {}", e.getMessage());
+ }
+ }
+ sqlBuilder.append(" ORDER BY record_time DESC LIMIT 20000"); // Add
safety limit for raw data
+
+ try (Connection connection = DriverManager.getConnection(URL_PREFIX +
dbPath);
+ PreparedStatement preparedStatement =
connection.prepareStatement(sqlBuilder.toString())) {
+
+ preparedStatement.setString(1, instance);
+ preparedStatement.setString(2, app);
+ preparedStatement.setString(3, metrics);
+ preparedStatement.setString(4, metric);
+ if (timeBefore > 0) {
+ preparedStatement.setLong(5, timeBefore);
+ }
+
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ while (resultSet.next()) {
+ long time = resultSet.getLong("record_time");
+ int type = resultSet.getShort("metric_type");
+ String labels = resultSet.getString("labels");
+ String value = formatValue(type, resultSet);
+
+ List<Value> valueList =
instanceValuesMap.computeIfAbsent(labels == null ? "" : labels, k -> new
LinkedList<>());
+ valueList.add(new Value(value, time));
+ }
+ }
+ } catch (SQLException e) {
+ log.error("[duckdb] query data error: {}", e.getMessage(), e);
+ }
+ return instanceValuesMap;
+ }
+
+ @Override
+ public Map<String, List<Value>> getHistoryIntervalMetricData(String
instance, String app, String metrics, String metric, String history) {
+ Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
+ if (!isServerAvailable()) {
+ return instanceValuesMap;
+ }
+
+ long timeBefore = 0;
+ long endTime = System.currentTimeMillis();
+ long interval = 0;
+
+ if (history != null) {
+ try {
+ TemporalAmount temporalAmount =
TimePeriodUtil.parseTokenTime(history);
+ ZonedDateTime dateTime =
ZonedDateTime.now().minus(temporalAmount);
+ timeBefore = dateTime.toEpochSecond() * 1000L;
+ // Calculate bucket interval based on desired target points
(e.g. 800 points on chart)
+ interval = (endTime - timeBefore) / TARGET_CHART_POINTS;
+ // Minimum interval 1 minute
+ if (interval < 60000) {
+ interval = 60000;
+ }
+ } catch (Exception e) {
+ log.error("parse history time error: {}", e.getMessage());
+ // Fallback defaults
+ timeBefore = System.currentTimeMillis() - 24 * 60 * 60 * 1000L;
+ interval = 60 * 1000L;
+ }
+ }
+
+ // DuckDB Aggregation Query: Group by time bucket
+ // We use casting to integer division for bucketing: (time / interval)
* interval
+ String sql = """
+ SELECT
+ CAST(record_time / ? AS BIGINT) * ? AS ts_bucket,
+ AVG(double_value) AS avg_val,
+ MIN(double_value) AS min_val,
+ MAX(double_value) AS max_val,
+ FIRST(str_value) AS str_val,
+ metric_type, labels
+ FROM hzb_history
+ WHERE instance = ? AND app = ? AND metrics = ? AND metric = ?
AND record_time >= ?
+ GROUP BY ts_bucket, metric_type, labels
+ ORDER BY ts_bucket""";
+
+ try (Connection connection = DriverManager.getConnection(URL_PREFIX +
dbPath);
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+
+ preparedStatement.setLong(1, interval);
+ preparedStatement.setLong(2, interval);
+ preparedStatement.setString(3, instance);
+ preparedStatement.setString(4, app);
+ preparedStatement.setString(5, metrics);
+ preparedStatement.setString(6, metric);
+ preparedStatement.setLong(7, timeBefore);
+
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ while (resultSet.next()) {
+ long time = resultSet.getLong("ts_bucket");
+ int type = resultSet.getShort("metric_type");
+ String labels = resultSet.getString("labels");
+
+ Value valueObj;
+ if (type == CommonConstants.TYPE_NUMBER) {
+ double avg = resultSet.getDouble("avg_val");
+ double min = resultSet.getDouble("min_val");
+ double max = resultSet.getDouble("max_val");
+ if (resultSet.wasNull()) {
+ valueObj = new Value(null, time);
+ } else {
+ String avgStr =
BigDecimal.valueOf(avg).setScale(4,
RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
+ String minStr =
BigDecimal.valueOf(min).setScale(4,
RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
+ String maxStr =
BigDecimal.valueOf(max).setScale(4,
RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
+ valueObj = new Value(avgStr, time);
+ valueObj.setMean(avgStr);
+ valueObj.setMin(minStr);
+ valueObj.setMax(maxStr);
+ }
+ } else {
+ // For non-numeric, we just took the FIRST value in
the bucket
+ String strVal = resultSet.getString("str_val");
+ valueObj = new Value(strVal, time);
+ }
+
+ List<Value> valueList =
instanceValuesMap.computeIfAbsent(labels == null ? "" : labels, k -> new
LinkedList<>());
+ valueList.add(valueObj);
+ }
+ }
+ } catch (SQLException e) {
+ log.error("[duckdb] query interval data error: {}",
e.getMessage(), e);
+ }
+ return instanceValuesMap;
+ }
+
+ private String formatValue(int type, ResultSet resultSet) throws
SQLException {
+ if (type == CommonConstants.TYPE_NUMBER) {
+ double v = resultSet.getDouble("double_value");
+ if (!resultSet.wasNull()) {
+ return BigDecimal.valueOf(v).setScale(4,
RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
+ }
+ } else if (type == CommonConstants.TYPE_TIME) {
+ int v = resultSet.getInt("int32_value");
+ if (!resultSet.wasNull()) {
+ return String.valueOf(v);
+ }
+ } else {
+ return resultSet.getString("str_value");
+ }
+ return "";
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ }
+}
\ No newline at end of file
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/jpa/JpaProperties.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/duckdb/DuckdbProperties.java
similarity index 73%
rename from
hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/jpa/JpaProperties.java
rename to
hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/duckdb/DuckdbProperties.java
index 0f8c586815..40fba5fe18 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/jpa/JpaProperties.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/duckdb/DuckdbProperties.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.hertzbeat.warehouse.store.history.tsdb.jpa;
+package org.apache.hertzbeat.warehouse.store.history.tsdb.duckdb;
import org.apache.hertzbeat.common.constants.ConfigConstants;
import org.apache.hertzbeat.common.constants.SignConstants;
@@ -24,18 +24,16 @@ import
org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.bind.DefaultValue;
/**
- * JPA configuration information
- * @param enabled use mysql/h2 jpa store metrics history data
+ * Duckdb configuration information
+ * @param enabled use duckdb store metrics history data
* @param expireTime save data expire time(ms)
- * @param maxHistoryRecordNum The maximum number of history records retained
*/
@ConfigurationProperties(prefix =
ConfigConstants.FunctionModuleConstants.WAREHOUSE
+ SignConstants.DOT
+ WarehouseConstants.STORE
+ SignConstants.DOT
- + WarehouseConstants.HistoryName.JPA)
-public record JpaProperties(@DefaultValue("true") boolean enabled,
- @DefaultValue("1h") String expireTime,
- @DefaultValue("20000") Integer
maxHistoryRecordNum) {
+ + WarehouseConstants.HistoryName.DUCKDB)
+public record DuckdbProperties(@DefaultValue("true") boolean enabled,
+ @DefaultValue("90d") String expireTime) {
}
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/jpa/JpaDatabaseDataStorage.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/jpa/JpaDatabaseDataStorage.java
deleted file mode 100644
index 6c31ab9a1c..0000000000
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/jpa/JpaDatabaseDataStorage.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hertzbeat.warehouse.store.history.tsdb.jpa;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import jakarta.persistence.criteria.Predicate;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.math.NumberUtils;
-import org.apache.hertzbeat.common.constants.CommonConstants;
-import org.apache.hertzbeat.common.constants.MetricDataConstants;
-import org.apache.hertzbeat.common.entity.arrow.ArrowCell;
-import org.apache.hertzbeat.common.entity.arrow.RowWrapper;
-import org.apache.hertzbeat.common.entity.dto.Value;
-import org.apache.hertzbeat.common.entity.message.CollectRep;
-import org.apache.hertzbeat.common.entity.warehouse.History;
-import org.apache.hertzbeat.common.util.JsonUtil;
-import org.apache.hertzbeat.common.util.TimePeriodUtil;
-import org.apache.hertzbeat.warehouse.dao.HistoryDao;
-import
org.apache.hertzbeat.warehouse.store.history.tsdb.AbstractHistoryDataStorage;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.data.domain.Sort;
-import org.springframework.data.jpa.domain.Specification;
-import org.springframework.stereotype.Component;
-
-import java.math.BigDecimal;
-import java.math.RoundingMode;
-import java.time.Duration;
-import java.time.ZonedDateTime;
-import java.time.temporal.TemporalAmount;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.LinkedList;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-/**
- * data storage by mysql/h2/pgsql - jpa
- */
-@Component
-@ConditionalOnProperty(prefix = "warehouse.store.jpa", name = "enabled",
havingValue = "true")
-@Slf4j
-public class JpaDatabaseDataStorage extends AbstractHistoryDataStorage {
- private final HistoryDao historyDao;
- private final JpaProperties jpaProperties;
-
- private static final int STRING_MAX_LENGTH = 1024;
-
- public JpaDatabaseDataStorage(JpaProperties jpaProperties,
- HistoryDao historyDao) {
- this.jpaProperties = jpaProperties;
- this.serverAvailable = true;
- this.historyDao = historyDao;
- expiredDataCleaner();
- }
-
- public void expiredDataCleaner() {
- ThreadFactory threadFactory = new ThreadFactoryBuilder()
- .setUncaughtExceptionHandler((thread, throwable) -> {
- log.error("Jpa metrics store has uncaughtException.");
- log.error(throwable.getMessage(), throwable);
- })
- .setDaemon(true)
- .setNameFormat("jpa-metrics-cleaner-%d")
- .build();
- ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor(threadFactory);
- scheduledExecutor.scheduleAtFixedRate(() -> {
- log.warn("[jpa-metrics-store]-start running expired data cleaner."
- + "Please use time series db instead of jpa for better
performance");
- String expireTimeStr = jpaProperties.expireTime();
- long expireTime;
- try {
- if (NumberUtils.isParsable(expireTimeStr)) {
- expireTime = NumberUtils.toLong(expireTimeStr);
- expireTime = (ZonedDateTime.now().toEpochSecond() +
expireTime) * 1000L;
- } else {
- TemporalAmount temporalAmount =
TimePeriodUtil.parseTokenTime(expireTimeStr);
- ZonedDateTime dateTime =
ZonedDateTime.now().minus(temporalAmount);
- expireTime = dateTime.toEpochSecond() * 1000L;
- }
- } catch (Exception e) {
- log.error("expiredDataCleaner time error: {}. use default
expire time to clean: 1h", e.getMessage());
- ZonedDateTime dateTime =
ZonedDateTime.now().minus(Duration.ofHours(1));
- expireTime = dateTime.toEpochSecond() * 1000L;
- }
- try {
- int rows = historyDao.deleteHistoriesByTimeBefore(expireTime);
- log.info("[jpa-metrics-store]-delete {} rows.", rows);
- long total = historyDao.count();
- if (total > jpaProperties.maxHistoryRecordNum()) {
- rows =
historyDao.deleteOlderHistoriesRecord(jpaProperties.maxHistoryRecordNum() / 2);
- log.warn("[jpa-metrics-store]-force delete {} rows due too
many. Please use time series db instead of jpa for better performance.", rows);
- }
- } catch (Exception e) {
- log.error("expiredDataCleaner database error: {}.",
e.getMessage());
- log.error("try to truncate table hzb_history. Please use time
series db instead of jpa for better performance.");
- historyDao.truncateTable();
- }
- }, 5, 30, TimeUnit.SECONDS);
- }
-
- @Override
- public void saveData(CollectRep.MetricsData metricsData) {
- if (metricsData.getCode() != CollectRep.Code.SUCCESS) {
- return;
- }
- if (metricsData.getValues().isEmpty()) {
- log.info("[warehouse jpa] flush metrics data {} is null, ignore.",
metricsData.getInstance());
- return;
- }
- String monitorType = metricsData.getApp();
- String metrics = metricsData.getMetrics();
-
- try {
- List<History> allHistoryList = Lists.newArrayList();
- Map<String, String> labels = Maps.newHashMapWithExpectedSize(8);
- RowWrapper rowWrapper = metricsData.readRow();
-
- while (rowWrapper.hasNextRow()) {
- rowWrapper = rowWrapper.nextRow();
- List<History> singleHistoryList = new ArrayList<>();
-
- rowWrapper.cellStream().forEach(cell ->
singleHistoryList.add(buildHistory(metricsData, cell, monitorType, metrics,
labels)));
- singleHistoryList.forEach(history ->
history.setMetricLabels(JsonUtil.toJson(labels)));
-
- allHistoryList.addAll(singleHistoryList);
- }
-
- historyDao.saveAll(allHistoryList);
- } catch (Exception e) {
- log.error(e.getMessage(), e);
- }
- }
-
- private History buildHistory(CollectRep.MetricsData metricsData, ArrowCell
cell, String monitorType, String metrics, Map<String, String> labels) {
- History.HistoryBuilder historyBuilder = History.builder()
- .instance(metricsData.getInstance())
- .app(monitorType)
- .metrics(metrics)
- .time(metricsData.getTime())
- .metric(cell.getField().getName());
-
- final String columnValue = cell.getValue();
- final int fieldType =
cell.getMetadataAsInteger(MetricDataConstants.TYPE);
- if (CommonConstants.NULL_VALUE.equals(columnValue)) {
- switch (fieldType) {
- case CommonConstants.TYPE_NUMBER ->
- historyBuilder.metricType(CommonConstants.TYPE_NUMBER)
- .dou(null);
- case CommonConstants.TYPE_STRING ->
- historyBuilder.metricType(CommonConstants.TYPE_STRING)
- .str(null);
- case CommonConstants.TYPE_TIME ->
historyBuilder.metricType(CommonConstants.TYPE_TIME)
- .int32(null);
- default ->
historyBuilder.metricType(CommonConstants.TYPE_NUMBER);
- }
- } else {
- switch (fieldType) {
- case CommonConstants.TYPE_STRING ->
- historyBuilder.metricType(CommonConstants.TYPE_STRING)
- .str(formatStrValue(columnValue));
- case CommonConstants.TYPE_TIME ->
historyBuilder.metricType(CommonConstants.TYPE_TIME)
- .int32(Integer.parseInt(columnValue));
- default -> {
- Double v = Double.parseDouble(columnValue);
- v = v.isNaN() ? null : v;
- historyBuilder.metricType(CommonConstants.TYPE_NUMBER)
- .dou(v);
- }
- }
-
- if (cell.getMetadataAsBoolean(MetricDataConstants.LABEL)) {
- labels.put(cell.getField().getName(), columnValue);
- }
- }
-
- return historyBuilder.build();
- }
-
- @Override
- public Map<String, List<Value>> getHistoryMetricData(String instance,
String app, String metrics, String metric, String history) {
- Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
- Specification<History> specification = (root, query, criteriaBuilder)
-> {
- List<Predicate> andList = new ArrayList<>();
- Predicate predicateInstance =
criteriaBuilder.equal(root.get("instance"), instance);
- Predicate predicateMonitorType =
criteriaBuilder.equal(root.get("app"), app);
- if (CommonConstants.PROMETHEUS.equals(app)) {
- predicateMonitorType = criteriaBuilder.like(root.get("app"),
CommonConstants.PROMETHEUS_APP_PREFIX + "%");
- }
- Predicate predicateMonitorMetrics =
criteriaBuilder.equal(root.get("metrics"), metrics);
- Predicate predicateMonitorMetric =
criteriaBuilder.equal(root.get("metric"), metric);
- andList.add(predicateInstance);
- andList.add(predicateMonitorType);
- andList.add(predicateMonitorMetrics);
- andList.add(predicateMonitorMetric);
-
- if (history != null) {
- try {
- TemporalAmount temporalAmount =
TimePeriodUtil.parseTokenTime(history);
- ZonedDateTime dateTime =
ZonedDateTime.now().minus(temporalAmount);
- long timeBefore = dateTime.toEpochSecond() * 1000L;
- Predicate timePredicate =
criteriaBuilder.ge(root.get("time"), timeBefore);
- andList.add(timePredicate);
- } catch (Exception e) {
- log.error(e.getMessage());
- }
- }
- Predicate[] predicates = new Predicate[andList.size()];
- Predicate predicate =
criteriaBuilder.and(andList.toArray(predicates));
- return query.where(predicate).getRestriction();
- };
- Sort sortExp = Sort.by(new Sort.Order(Sort.Direction.DESC, "time"));
- List<History> historyList = historyDao.findAll(specification, sortExp);
- for (History dataItem : historyList) {
- String value = "";
- if (dataItem.getMetricType() == CommonConstants.TYPE_NUMBER) {
- if (dataItem.getDou() != null) {
- value = BigDecimal.valueOf(dataItem.getDou()).setScale(4,
RoundingMode.HALF_UP)
- .stripTrailingZeros().toPlainString();
- }
- } else {
- value = dataItem.getStr();
- }
- String instanceValue = dataItem.getMetricLabels() == null ? "" :
dataItem.getMetricLabels();
- List<Value> valueList =
instanceValuesMap.computeIfAbsent(instanceValue, k -> new LinkedList<>());
- valueList.add(new Value(value, dataItem.getTime()));
- }
- return instanceValuesMap;
- }
-
- private String formatStrValue(String value) {
- if (value == null) {
- return "";
- }
- value = value.replace("'", "\\'");
- value = value.replace("\"", "\\\"");
- value = value.replace("*", "-");
- value = String.format("`%s`", value);
- if (value.length() > STRING_MAX_LENGTH) {
- value = value.substring(0, STRING_MAX_LENGTH - 1);
- }
- return value;
- }
-
- @Override
- public Map<String, List<Value>> getHistoryIntervalMetricData(String
instance, String app, String metrics, String metric, String history) {
- return new HashMap<>(8);
- }
-
- @Override
- public void destroy() throws Exception {
- }
-}
diff --git a/material/licenses/LICENSE b/material/licenses/LICENSE
index 85eeac878b..cbdcda866a 100644
--- a/material/licenses/LICENSE
+++ b/material/licenses/LICENSE
@@ -541,6 +541,7 @@ The text of each license is also included in
licenses/LICENSE-[project].txt.
https://mvnrepository.com/artifact/com.taosdata.jdbc/taos-jdbcdriver/3.0.0
https://mvnrepository.com/artifact/com.beetstra.jutf7/jutf7/1.0.0
https://mvnrepository.com/artifact/org.testcontainers/kafka/1.20.2 MIT
+ https://mvnrepository.com/artifact/org.duckdb/duckdb_jdbc/1.4.2.0 MIT
========================================================================
MPL-1.1 licenses
diff --git a/pom.xml b/pom.xml
index c8d5d2aa65..5a3904365f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -169,6 +169,7 @@
<commons-net>3.8.0</commons-net>
<zookeeper.version>3.9.3</zookeeper.version>
+ <duckdb.version>1.4.2.0</duckdb.version>
<iotdb-session.version>0.13.3</iotdb-session.version>
<influxdb.version>2.23</influxdb.version>
<spring-cloud-starter-openfeign.version>3.0.5</spring-cloud-starter-openfeign.version>
diff --git a/script/application.yml b/script/application.yml
index e141b4606c..ea661edf93 100644
--- a/script/application.yml
+++ b/script/application.yml
@@ -169,13 +169,10 @@ common:
warehouse:
store:
# store history metrics data, enable only one below
- jpa:
+ duckdb:
enabled: true
# The maximum retention time for history records, after which records
will be deleted
- expire-time: 1h
- # The maximum number of history records retained, if this number is
exceeded, half of the data in this configuration item will be deleted
- # (please set this configuration reasonably as history records can
affect performance when it is large)
- max-history-record-num: 6000
+ expire-time: 90d
victoria-metrics:
# Standalone mode toggle — must be set to false when using cluster mode
enabled: false
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]