This is an automated email from the ASF dual-hosted git repository.

gongchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git


The following commit(s) were added to refs/heads/master by this push:
     new 360abcc61d [feature] Using duckdb instead of jpa to store history data 
(#3883)
360abcc61d is described below

commit 360abcc61dd0ff1376c6f4c73119de8dffe3bff8
Author: Logic <[email protected]>
AuthorDate: Thu Dec 4 23:51:04 2025 +0800

    [feature] Using duckdb instead of jpa to store history data (#3883)
    
    Signed-off-by: Logic <[email protected]>
    Co-authored-by: aias00 <[email protected]>
    Co-authored-by: Copilot <[email protected]>
---
 .../log/alert/LogPeriodicAlertE2eTest.java         |   2 +-
 .../log/storage/GreptimeLogStorageE2eTest.java     |   2 +-
 .../src/main/resources/application-test.yml        |   5 +-
 .../src/main/resources/application.yml             |   7 +-
 hertzbeat-warehouse/pom.xml                        |   8 +-
 .../warehouse/constants/WarehouseConstants.java    |   2 +
 .../tsdb/duckdb/DuckdbDatabaseDataStorage.java     | 402 +++++++++++++++++++++
 .../DuckdbProperties.java}                         |  14 +-
 .../history/tsdb/jpa/JpaDatabaseDataStorage.java   | 273 --------------
 material/licenses/LICENSE                          |   1 +
 pom.xml                                            |   1 +
 script/application.yml                             |   7 +-
 12 files changed, 427 insertions(+), 297 deletions(-)

diff --git 
a/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/alert/LogPeriodicAlertE2eTest.java
 
b/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/alert/LogPeriodicAlertE2eTest.java
index e3c387dbe6..8ad662c2b0 100644
--- 
a/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/alert/LogPeriodicAlertE2eTest.java
+++ 
b/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/alert/LogPeriodicAlertE2eTest.java
@@ -102,7 +102,7 @@ public class LogPeriodicAlertE2eTest {
     @DynamicPropertySource
     static void greptimeProps(DynamicPropertyRegistry r) {
         // Configure GreptimeDB storage
-        r.add("warehouse.store.jpa.enabled", () -> "false");
+        r.add("warehouse.store.duckdb.enabled", () -> "false");
         r.add("warehouse.store.greptime.enabled", () -> "true");
         r.add("warehouse.store.greptime.http-endpoint", () -> 
"http://localhost:"; + greptimedb.getMappedPort(GREPTIME_HTTP_PORT));
         r.add("warehouse.store.greptime.grpc-endpoints", () -> "localhost:" + 
greptimedb.getMappedPort(GREPTIME_GRPC_PORT));
diff --git 
a/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/storage/GreptimeLogStorageE2eTest.java
 
b/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/storage/GreptimeLogStorageE2eTest.java
index 4aba2f1eb2..6a85ffa107 100644
--- 
a/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/storage/GreptimeLogStorageE2eTest.java
+++ 
b/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/storage/GreptimeLogStorageE2eTest.java
@@ -86,7 +86,7 @@ public class GreptimeLogStorageE2eTest {
 
     @DynamicPropertySource
     static void greptimeProps(DynamicPropertyRegistry r) {
-        r.add("warehouse.store.jpa.enabled", () -> "false");
+        r.add("warehouse.store.duckdb.enabled", () -> "false");
         r.add("warehouse.store.greptime.enabled", () -> "true");
         r.add("warehouse.store.greptime.http-endpoint", () -> 
"http://localhost:"; + greptimedb.getMappedPort(GREPTIME_HTTP_PORT));
         r.add("warehouse.store.greptime.grpc-endpoints", () -> "localhost:" + 
greptimedb.getMappedPort(GREPTIME_GRPC_PORT));
diff --git a/hertzbeat-startup/src/main/resources/application-test.yml 
b/hertzbeat-startup/src/main/resources/application-test.yml
index 4aa13715ff..029530d621 100644
--- a/hertzbeat-startup/src/main/resources/application-test.yml
+++ b/hertzbeat-startup/src/main/resources/application-test.yml
@@ -55,10 +55,9 @@ common:
     
 warehouse:
   store:
-    jpa:
+    duckdb:
       enabled: true
-      expire-time: 1h
-      max-history-record-num: 6000
+      expire-time: 90d
     victoria-metrics:
       enabled: false
       url: http://localhost:8428
diff --git a/hertzbeat-startup/src/main/resources/application.yml 
b/hertzbeat-startup/src/main/resources/application.yml
index e141b4606c..ea661edf93 100644
--- a/hertzbeat-startup/src/main/resources/application.yml
+++ b/hertzbeat-startup/src/main/resources/application.yml
@@ -169,13 +169,10 @@ common:
 warehouse:
   store:
     # store history metrics data, enable only one below
-    jpa:
+    duckdb:
       enabled: true
       # The maximum retention time for history records, after which records 
will be deleted
-      expire-time: 1h
-      # The maximum number of history records retained, if this number is 
exceeded, half of the data in this configuration item will be deleted
-      # (please set this configuration reasonably as history records can 
affect performance when it is large)
-      max-history-record-num: 6000
+      expire-time: 90d
     victoria-metrics:
       # Standalone mode toggle — must be set to false when using cluster mode
       enabled: false
diff --git a/hertzbeat-warehouse/pom.xml b/hertzbeat-warehouse/pom.xml
index 95a32a5012..169d014d74 100644
--- a/hertzbeat-warehouse/pom.xml
+++ b/hertzbeat-warehouse/pom.xml
@@ -150,11 +150,17 @@
             <groupId>org.apache.arrow</groupId>
             <artifactId>arrow-memory-netty</artifactId>
         </dependency>
-        
+
         <dependency>
             <groupId>org.xerial.snappy</groupId>
             <artifactId>snappy-java</artifactId>
             <version>${snappy-java.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.duckdb</groupId>
+            <artifactId>duckdb_jdbc</artifactId>
+            <version>${duckdb.version}</version>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
index b646b5f9ec..ad9cfb8253 100644
--- 
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
+++ 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
@@ -46,6 +46,8 @@ public interface WarehouseConstants {
         String VM_CLUSTER = "victoria-metrics.cluster";
 
         String QUEST_DB = "questdb";
+
+        String DUCKDB = "duckdb";
     }
 
     /**
diff --git 
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/duckdb/DuckdbDatabaseDataStorage.java
 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/duckdb/DuckdbDatabaseDataStorage.java
new file mode 100644
index 0000000000..f8c274fb4b
--- /dev/null
+++ 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/duckdb/DuckdbDatabaseDataStorage.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.store.history.tsdb.duckdb;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.hertzbeat.common.constants.CommonConstants;
+import org.apache.hertzbeat.common.constants.MetricDataConstants;
+import org.apache.hertzbeat.common.entity.arrow.RowWrapper;
+import org.apache.hertzbeat.common.entity.dto.Value;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.hertzbeat.common.util.JsonUtil;
+import org.apache.hertzbeat.common.util.TimePeriodUtil;
+import 
org.apache.hertzbeat.warehouse.store.history.tsdb.AbstractHistoryDataStorage;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.time.ZonedDateTime;
+import java.time.temporal.TemporalAmount;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * data storage by duckdb
+ */
+@Component
+@ConditionalOnProperty(prefix = "warehouse.store.duckdb", name = "enabled", 
havingValue = "true")
+@Slf4j
+public class DuckdbDatabaseDataStorage extends AbstractHistoryDataStorage {
+
+    private static final String DRIVER_NAME = "org.duckdb.DuckDBDriver";
+    private static final String URL_PREFIX = "jdbc:duckdb:";
+    private static final String DEFAULT_DB_FILE = "data/history.duckdb";
+
+    // Ideal number of data points for charting (avoids frontend lag)
+    private static final int TARGET_CHART_POINTS = 800;
+
+    
@org.springframework.beans.factory.annotation.Value("${warehouse.store.duckdb.expire-time:30d}")
+    private String expireTimeStr;
+
+    private final String dbPath;
+
+    public DuckdbDatabaseDataStorage() {
+        this.dbPath = DEFAULT_DB_FILE;
+        this.serverAvailable = initDuckDb();
+        if (this.serverAvailable) {
+            startExpiredDataCleaner();
+        }
+    }
+
+    private boolean initDuckDb() {
+        try {
+            Class.forName(DRIVER_NAME);
+            try (Connection connection = 
DriverManager.getConnection(URL_PREFIX + dbPath);
+                 Statement statement = connection.createStatement()) {
+                // instance app metrics metric metric_type int32_value 
double_value str_value record_time labels
+                String createTableSql = """
+                        CREATE TABLE IF NOT EXISTS hzb_history (
+                        instance VARCHAR,
+                        app VARCHAR,
+                        metrics VARCHAR,
+                        metric VARCHAR,
+                        metric_type SMALLINT,
+                        int32_value INTEGER,
+                        double_value DOUBLE,
+                        str_value VARCHAR,
+                        record_time BIGINT,
+                        labels VARCHAR)""";
+                statement.execute(createTableSql);
+                // Add composite index for query performance
+                String createCompositeIndexSql = "CREATE INDEX IF NOT EXISTS 
idx_hzb_history_composite ON hzb_history(instance, app, metrics, metric, 
record_time)";
+                statement.execute(createCompositeIndexSql);
+                // Add index for cleanup performance
+                String createRecordTimeIndexSql = "CREATE INDEX IF NOT EXISTS 
idx_hzb_history_record_time ON hzb_history(record_time)";
+                statement.execute(createRecordTimeIndexSql);
+                return true;
+            }
+        } catch (Exception e) {
+            log.error("Failed to init duckdb: {}", e.getMessage(), e);
+            return false;
+        }
+    }
+
+    private void startExpiredDataCleaner() {
+        ScheduledExecutorService scheduledExecutor = 
Executors.newSingleThreadScheduledExecutor(r -> {
+            Thread thread = new Thread(r, "duckdb-cleaner");
+            thread.setDaemon(true);
+            return thread;
+        });
+        scheduledExecutor.scheduleAtFixedRate(() -> {
+            long expireTime;
+            try {
+                if (NumberUtils.isParsable(expireTimeStr)) {
+                    expireTime = NumberUtils.toLong(expireTimeStr);
+                    expireTime = (ZonedDateTime.now().toEpochSecond() + 
expireTime) * 1000L;
+                } else {
+                    TemporalAmount temporalAmount = 
TimePeriodUtil.parseTokenTime(expireTimeStr);
+                    ZonedDateTime dateTime = 
ZonedDateTime.now().minus(temporalAmount);
+                    expireTime = dateTime.toEpochSecond() * 1000L;
+                }
+            } catch (Exception e) {
+                log.error("expiredDataCleaner time error: {}. use default 
expire time to clean: 30d", e.getMessage());
+                ZonedDateTime dateTime = 
ZonedDateTime.now().minus(Duration.ofDays(30));
+                expireTime = dateTime.toEpochSecond() * 1000L;
+            }
+            try (Connection connection = 
DriverManager.getConnection(URL_PREFIX + dbPath);
+                 PreparedStatement statement = 
connection.prepareStatement("DELETE FROM hzb_history WHERE record_time < ?")) {
+                statement.setLong(1, expireTime);
+                int rows = statement.executeUpdate();
+                if (rows > 0) {
+                    log.info("[duckdb] delete {} expired records.", rows);
+                }
+            } catch (Exception e) {
+                log.error("[duckdb] clean expired data error: {}", 
e.getMessage(), e);
+            }
+        }, 5, 60, TimeUnit.MINUTES);
+    }
+
+    @Override
+    public void saveData(CollectRep.MetricsData metricsData) {
+        if (!isServerAvailable() || metricsData.getCode() != 
CollectRep.Code.SUCCESS || metricsData.getValues().isEmpty()) {
+            return;
+        }
+
+        String monitorType = metricsData.getApp();
+        String metrics = metricsData.getMetrics();
+        String insertSql = "INSERT INTO hzb_history VALUES (?, ?, ?, ?, ?, ?, 
?, ?, ?, ?)";
+
+        try (Connection connection = DriverManager.getConnection(URL_PREFIX + 
dbPath);
+             PreparedStatement preparedStatement = 
connection.prepareStatement(insertSql)) {
+
+            RowWrapper rowWrapper = metricsData.readRow();
+            Map<String, String> labels = new HashMap<>();
+
+            while (rowWrapper.hasNextRow()) {
+                rowWrapper = rowWrapper.nextRow();
+                long time = metricsData.getTime();
+
+                // First pass: collect labels
+                rowWrapper.cellStream().forEach(cell -> {
+                    if (cell.getMetadataAsBoolean(MetricDataConstants.LABEL)) {
+                        labels.put(cell.getField().getName(), cell.getValue());
+                    }
+                });
+                String labelsJson = JsonUtil.toJson(labels);
+
+                // Second pass: insert data
+                rowWrapper.cellStream().forEach(cell -> {
+                    try {
+                        String metric = cell.getField().getName();
+                        String columnValue = cell.getValue();
+                        int fieldType = 
cell.getMetadataAsInteger(MetricDataConstants.TYPE);
+
+                        preparedStatement.setString(1, 
metricsData.getInstance());
+                        preparedStatement.setString(2, monitorType);
+                        preparedStatement.setString(3, metrics);
+                        preparedStatement.setString(4, metric);
+
+                        if (CommonConstants.NULL_VALUE.equals(columnValue)) {
+                            preparedStatement.setShort(5, (short) 
CommonConstants.TYPE_NUMBER);
+                            preparedStatement.setObject(6, null);
+                            preparedStatement.setObject(7, null);
+                            preparedStatement.setObject(8, null);
+                        } else {
+                            switch (fieldType) {
+                                case CommonConstants.TYPE_STRING -> {
+                                    preparedStatement.setShort(5, (short) 
CommonConstants.TYPE_STRING);
+                                    preparedStatement.setObject(6, null);
+                                    preparedStatement.setObject(7, null);
+                                    preparedStatement.setString(8, 
columnValue);
+                                }
+                                case CommonConstants.TYPE_TIME -> {
+                                    preparedStatement.setShort(5, (short) 
CommonConstants.TYPE_TIME);
+                                    try {
+                                        preparedStatement.setInt(6, 
Integer.parseInt(columnValue));
+                                    } catch (NumberFormatException nfe) {
+                                        log.warn("Failed to parse columnValue 
'{}' as integer for metric '{}'. Setting value to null.", columnValue, metric, 
nfe);
+                                        preparedStatement.setObject(6, null);
+                                    }
+                                    preparedStatement.setObject(7, null);
+                                    preparedStatement.setObject(8, null);
+                                }
+                                default -> {
+                                    preparedStatement.setShort(5, (short) 
CommonConstants.TYPE_NUMBER);
+                                    preparedStatement.setObject(6, null);
+                                    double v = Double.parseDouble(columnValue);
+                                    preparedStatement.setDouble(7, v);
+                                    preparedStatement.setObject(8, null);
+                                }
+                            }
+                        }
+                        preparedStatement.setLong(9, time);
+                        preparedStatement.setString(10, labelsJson);
+                        preparedStatement.addBatch();
+                    } catch (SQLException e) {
+                        log.error("error setting prepared statement", e);
+                    }
+                });
+                labels.clear();
+            }
+            preparedStatement.executeBatch();
+        } catch (Exception e) {
+            log.error("[duckdb] save data error: {}", e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public Map<String, List<Value>> getHistoryMetricData(String instance, 
String app, String metrics, String metric, String history) {
+        Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
+        if (!isServerAvailable()) {
+            return instanceValuesMap;
+        }
+
+        // Raw data query - limit to avoid memory issues
+        StringBuilder sqlBuilder = new StringBuilder("SELECT record_time, 
metric_type, int32_value, double_value,"
+                + " str_value, labels FROM hzb_history WHERE instance = ? AND 
app = ? AND metrics = ? AND metric = ?");
+
+        long timeBefore = 0;
+        if (history != null) {
+            try {
+                TemporalAmount temporalAmount = 
TimePeriodUtil.parseTokenTime(history);
+                ZonedDateTime dateTime = 
ZonedDateTime.now().minus(temporalAmount);
+                timeBefore = dateTime.toEpochSecond() * 1000L;
+                sqlBuilder.append(" AND record_time >= ?");
+            } catch (Exception e) {
+                log.error("parse history time error: {}", e.getMessage());
+            }
+        }
+        sqlBuilder.append(" ORDER BY record_time DESC LIMIT 20000"); // Add 
safety limit for raw data
+
+        try (Connection connection = DriverManager.getConnection(URL_PREFIX + 
dbPath);
+             PreparedStatement preparedStatement = 
connection.prepareStatement(sqlBuilder.toString())) {
+
+            preparedStatement.setString(1, instance);
+            preparedStatement.setString(2, app);
+            preparedStatement.setString(3, metrics);
+            preparedStatement.setString(4, metric);
+            if (timeBefore > 0) {
+                preparedStatement.setLong(5, timeBefore);
+            }
+
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                while (resultSet.next()) {
+                    long time = resultSet.getLong("record_time");
+                    int type = resultSet.getShort("metric_type");
+                    String labels = resultSet.getString("labels");
+                    String value = formatValue(type, resultSet);
+
+                    List<Value> valueList = 
instanceValuesMap.computeIfAbsent(labels == null ? "" : labels, k -> new 
LinkedList<>());
+                    valueList.add(new Value(value, time));
+                }
+            }
+        } catch (SQLException e) {
+            log.error("[duckdb] query data error: {}", e.getMessage(), e);
+        }
+        return instanceValuesMap;
+    }
+
+    @Override
+    public Map<String, List<Value>> getHistoryIntervalMetricData(String 
instance, String app, String metrics, String metric, String history) {
+        Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
+        if (!isServerAvailable()) {
+            return instanceValuesMap;
+        }
+
+        long timeBefore = 0;
+        long endTime = System.currentTimeMillis();
+        long interval = 0;
+
+        if (history != null) {
+            try {
+                TemporalAmount temporalAmount = 
TimePeriodUtil.parseTokenTime(history);
+                ZonedDateTime dateTime = 
ZonedDateTime.now().minus(temporalAmount);
+                timeBefore = dateTime.toEpochSecond() * 1000L;
+                // Calculate bucket interval based on desired target points 
(e.g. 800 points on chart)
+                interval = (endTime - timeBefore) / TARGET_CHART_POINTS;
+                // Minimum interval 1 minute
+                if (interval < 60000) {
+                    interval = 60000;
+                }
+            } catch (Exception e) {
+                log.error("parse history time error: {}", e.getMessage());
+                // Fallback defaults
+                timeBefore = System.currentTimeMillis() - 24 * 60 * 60 * 1000L;
+                interval = 60 * 1000L;
+            }
+        }
+
+        // DuckDB Aggregation Query: Group by time bucket
+        // We use casting to integer division for bucketing: (time / interval) 
* interval
+        String sql = """
+                SELECT
+                CAST(record_time / ? AS BIGINT) * ? AS ts_bucket,
+                AVG(double_value) AS avg_val,
+                MIN(double_value) AS min_val,
+                MAX(double_value) AS max_val,
+                FIRST(str_value) AS str_val,
+                metric_type, labels
+                FROM hzb_history
+                WHERE instance = ? AND app = ? AND metrics = ? AND metric = ? 
AND record_time >= ?
+                GROUP BY ts_bucket, metric_type, labels
+                ORDER BY ts_bucket""";
+
+        try (Connection connection = DriverManager.getConnection(URL_PREFIX + 
dbPath);
+             PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+
+            preparedStatement.setLong(1, interval);
+            preparedStatement.setLong(2, interval);
+            preparedStatement.setString(3, instance);
+            preparedStatement.setString(4, app);
+            preparedStatement.setString(5, metrics);
+            preparedStatement.setString(6, metric);
+            preparedStatement.setLong(7, timeBefore);
+
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                while (resultSet.next()) {
+                    long time = resultSet.getLong("ts_bucket");
+                    int type = resultSet.getShort("metric_type");
+                    String labels = resultSet.getString("labels");
+
+                    Value valueObj;
+                    if (type == CommonConstants.TYPE_NUMBER) {
+                        double avg = resultSet.getDouble("avg_val");
+                        double min = resultSet.getDouble("min_val");
+                        double max = resultSet.getDouble("max_val");
+                        if (resultSet.wasNull()) {
+                            valueObj = new Value(null, time);
+                        } else {
+                            String avgStr = 
BigDecimal.valueOf(avg).setScale(4, 
RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
+                            String minStr = 
BigDecimal.valueOf(min).setScale(4, 
RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
+                            String maxStr = 
BigDecimal.valueOf(max).setScale(4, 
RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
+                            valueObj = new Value(avgStr, time);
+                            valueObj.setMean(avgStr);
+                            valueObj.setMin(minStr);
+                            valueObj.setMax(maxStr);
+                        }
+                    } else {
+                        // For non-numeric, we just took the FIRST value in 
the bucket
+                        String strVal = resultSet.getString("str_val");
+                        valueObj = new Value(strVal, time);
+                    }
+
+                    List<Value> valueList = 
instanceValuesMap.computeIfAbsent(labels == null ? "" : labels, k -> new 
LinkedList<>());
+                    valueList.add(valueObj);
+                }
+            }
+        } catch (SQLException e) {
+            log.error("[duckdb] query interval data error: {}", 
e.getMessage(), e);
+        }
+        return instanceValuesMap;
+    }
+
+    private String formatValue(int type, ResultSet resultSet) throws 
SQLException {
+        if (type == CommonConstants.TYPE_NUMBER) {
+            double v = resultSet.getDouble("double_value");
+            if (!resultSet.wasNull()) {
+                return BigDecimal.valueOf(v).setScale(4, 
RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
+            }
+        } else if (type == CommonConstants.TYPE_TIME) {
+            int v = resultSet.getInt("int32_value");
+            if (!resultSet.wasNull()) {
+                return String.valueOf(v);
+            }
+        } else {
+            return resultSet.getString("str_value");
+        }
+        return "";
+    }
+
+    @Override
+    public void destroy() throws Exception {
+    }
+}
\ No newline at end of file
diff --git 
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/jpa/JpaProperties.java
 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/duckdb/DuckdbProperties.java
similarity index 73%
rename from 
hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/jpa/JpaProperties.java
rename to 
hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/duckdb/DuckdbProperties.java
index 0f8c586815..40fba5fe18 100644
--- 
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/jpa/JpaProperties.java
+++ 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/duckdb/DuckdbProperties.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.hertzbeat.warehouse.store.history.tsdb.jpa;
+package org.apache.hertzbeat.warehouse.store.history.tsdb.duckdb;
 
 import org.apache.hertzbeat.common.constants.ConfigConstants;
 import org.apache.hertzbeat.common.constants.SignConstants;
@@ -24,18 +24,16 @@ import 
org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.boot.context.properties.bind.DefaultValue;
 
 /**
- * JPA configuration information
- * @param enabled use mysql/h2 jpa store metrics history data
+ * Duckdb configuration information
+ * @param enabled use duckdb store metrics history data
  * @param expireTime save data expire time(ms)
- * @param maxHistoryRecordNum The maximum number of history records retained
  */
 
 @ConfigurationProperties(prefix = 
ConfigConstants.FunctionModuleConstants.WAREHOUSE
                + SignConstants.DOT
                + WarehouseConstants.STORE
                + SignConstants.DOT
-               + WarehouseConstants.HistoryName.JPA)
-public record JpaProperties(@DefaultValue("true") boolean enabled,
-                            @DefaultValue("1h") String expireTime,
-                            @DefaultValue("20000") Integer 
maxHistoryRecordNum) {
+               + WarehouseConstants.HistoryName.DUCKDB)
+public record DuckdbProperties(@DefaultValue("true") boolean enabled,
+                                @DefaultValue("90d") String expireTime) {
 }
diff --git 
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/jpa/JpaDatabaseDataStorage.java
 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/jpa/JpaDatabaseDataStorage.java
deleted file mode 100644
index 6c31ab9a1c..0000000000
--- 
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/jpa/JpaDatabaseDataStorage.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hertzbeat.warehouse.store.history.tsdb.jpa;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import jakarta.persistence.criteria.Predicate;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.math.NumberUtils;
-import org.apache.hertzbeat.common.constants.CommonConstants;
-import org.apache.hertzbeat.common.constants.MetricDataConstants;
-import org.apache.hertzbeat.common.entity.arrow.ArrowCell;
-import org.apache.hertzbeat.common.entity.arrow.RowWrapper;
-import org.apache.hertzbeat.common.entity.dto.Value;
-import org.apache.hertzbeat.common.entity.message.CollectRep;
-import org.apache.hertzbeat.common.entity.warehouse.History;
-import org.apache.hertzbeat.common.util.JsonUtil;
-import org.apache.hertzbeat.common.util.TimePeriodUtil;
-import org.apache.hertzbeat.warehouse.dao.HistoryDao;
-import 
org.apache.hertzbeat.warehouse.store.history.tsdb.AbstractHistoryDataStorage;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.data.domain.Sort;
-import org.springframework.data.jpa.domain.Specification;
-import org.springframework.stereotype.Component;
-
-import java.math.BigDecimal;
-import java.math.RoundingMode;
-import java.time.Duration;
-import java.time.ZonedDateTime;
-import java.time.temporal.TemporalAmount;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.LinkedList;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-/**
- * data storage by mysql/h2/pgsql - jpa
- */
-@Component
-@ConditionalOnProperty(prefix = "warehouse.store.jpa", name = "enabled", 
havingValue = "true")
-@Slf4j
-public class JpaDatabaseDataStorage extends AbstractHistoryDataStorage {
-    private final HistoryDao historyDao;
-    private final JpaProperties jpaProperties;
-
-    private static final int STRING_MAX_LENGTH = 1024;
-
-    public JpaDatabaseDataStorage(JpaProperties jpaProperties,
-                                  HistoryDao historyDao) {
-        this.jpaProperties = jpaProperties;
-        this.serverAvailable = true;
-        this.historyDao = historyDao;
-        expiredDataCleaner();
-    }
-
-    public void expiredDataCleaner() {
-        ThreadFactory threadFactory = new ThreadFactoryBuilder()
-                .setUncaughtExceptionHandler((thread, throwable) -> {
-                    log.error("Jpa metrics store has uncaughtException.");
-                    log.error(throwable.getMessage(), throwable);
-                })
-                .setDaemon(true)
-                .setNameFormat("jpa-metrics-cleaner-%d")
-                .build();
-        ScheduledExecutorService scheduledExecutor = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
-        scheduledExecutor.scheduleAtFixedRate(() -> {
-            log.warn("[jpa-metrics-store]-start running expired data cleaner."
-                    + "Please use time series db instead of jpa for better 
performance");
-            String expireTimeStr = jpaProperties.expireTime();
-            long expireTime;
-            try {
-                if (NumberUtils.isParsable(expireTimeStr)) {
-                    expireTime = NumberUtils.toLong(expireTimeStr);
-                    expireTime = (ZonedDateTime.now().toEpochSecond() + 
expireTime) * 1000L;
-                } else {
-                    TemporalAmount temporalAmount = 
TimePeriodUtil.parseTokenTime(expireTimeStr);
-                    ZonedDateTime dateTime = 
ZonedDateTime.now().minus(temporalAmount);
-                    expireTime = dateTime.toEpochSecond() * 1000L;
-                }
-            } catch (Exception e) {
-                log.error("expiredDataCleaner time error: {}. use default 
expire time to clean: 1h", e.getMessage());
-                ZonedDateTime dateTime = 
ZonedDateTime.now().minus(Duration.ofHours(1));
-                expireTime = dateTime.toEpochSecond() * 1000L;
-            }
-            try {
-                int rows = historyDao.deleteHistoriesByTimeBefore(expireTime);
-                log.info("[jpa-metrics-store]-delete {} rows.", rows);
-                long total = historyDao.count();
-                if (total > jpaProperties.maxHistoryRecordNum()) {
-                    rows = 
historyDao.deleteOlderHistoriesRecord(jpaProperties.maxHistoryRecordNum() / 2);
-                    log.warn("[jpa-metrics-store]-force delete {} rows due too 
many. Please use time series db instead of jpa for better performance.", rows);
-                }
-            } catch (Exception e) {
-                log.error("expiredDataCleaner database error: {}.", 
e.getMessage());
-                log.error("try to truncate table hzb_history. Please use time 
series db instead of jpa for better performance.");
-                historyDao.truncateTable();
-            }
-        }, 5, 30, TimeUnit.SECONDS);
-    }
-
-    @Override
-    public void saveData(CollectRep.MetricsData metricsData) {
-        if (metricsData.getCode() != CollectRep.Code.SUCCESS) {
-            return;
-        }
-        if (metricsData.getValues().isEmpty()) {
-            log.info("[warehouse jpa] flush metrics data {} is null, ignore.", 
metricsData.getInstance());
-            return;
-        }
-        String monitorType = metricsData.getApp();
-        String metrics = metricsData.getMetrics();
-
-        try {
-            List<History> allHistoryList = Lists.newArrayList();
-            Map<String, String> labels = Maps.newHashMapWithExpectedSize(8);
-            RowWrapper rowWrapper = metricsData.readRow();
-
-            while (rowWrapper.hasNextRow()) {
-                rowWrapper = rowWrapper.nextRow();
-                List<History> singleHistoryList = new ArrayList<>();
-
-                rowWrapper.cellStream().forEach(cell -> 
singleHistoryList.add(buildHistory(metricsData, cell, monitorType, metrics, 
labels)));
-                singleHistoryList.forEach(history -> 
history.setMetricLabels(JsonUtil.toJson(labels)));
-
-                allHistoryList.addAll(singleHistoryList);
-            }
-
-            historyDao.saveAll(allHistoryList);
-        } catch (Exception e) {
-            log.error(e.getMessage(), e);
-        }
-    }
-
-    private History buildHistory(CollectRep.MetricsData metricsData, ArrowCell 
cell, String monitorType, String metrics, Map<String, String> labels) {
-        History.HistoryBuilder historyBuilder = History.builder()
-                .instance(metricsData.getInstance())
-                .app(monitorType)
-                .metrics(metrics)
-                .time(metricsData.getTime())
-                .metric(cell.getField().getName());
-
-        final String columnValue = cell.getValue();
-        final int fieldType = 
cell.getMetadataAsInteger(MetricDataConstants.TYPE);
-        if (CommonConstants.NULL_VALUE.equals(columnValue)) {
-            switch (fieldType) {
-                case CommonConstants.TYPE_NUMBER ->
-                        historyBuilder.metricType(CommonConstants.TYPE_NUMBER)
-                                .dou(null);
-                case CommonConstants.TYPE_STRING ->
-                        historyBuilder.metricType(CommonConstants.TYPE_STRING)
-                                .str(null);
-                case CommonConstants.TYPE_TIME -> 
historyBuilder.metricType(CommonConstants.TYPE_TIME)
-                        .int32(null);
-                default -> 
historyBuilder.metricType(CommonConstants.TYPE_NUMBER);
-            }
-        } else {
-            switch (fieldType) {
-                case CommonConstants.TYPE_STRING ->
-                        historyBuilder.metricType(CommonConstants.TYPE_STRING)
-                                .str(formatStrValue(columnValue));
-                case CommonConstants.TYPE_TIME -> 
historyBuilder.metricType(CommonConstants.TYPE_TIME)
-                        .int32(Integer.parseInt(columnValue));
-                default -> {
-                    Double v = Double.parseDouble(columnValue);
-                    v = v.isNaN() ? null : v;
-                    historyBuilder.metricType(CommonConstants.TYPE_NUMBER)
-                            .dou(v);
-                }
-            }
-
-            if (cell.getMetadataAsBoolean(MetricDataConstants.LABEL)) {
-                labels.put(cell.getField().getName(), columnValue);
-            }
-        }
-
-        return historyBuilder.build();
-    }
-
-    @Override
-    public Map<String, List<Value>> getHistoryMetricData(String instance, 
String app, String metrics, String metric, String history) {
-        Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
-        Specification<History> specification = (root, query, criteriaBuilder) 
-> {
-            List<Predicate> andList = new ArrayList<>();
-            Predicate predicateInstance = 
criteriaBuilder.equal(root.get("instance"), instance);
-            Predicate predicateMonitorType = 
criteriaBuilder.equal(root.get("app"), app);
-            if (CommonConstants.PROMETHEUS.equals(app)) {
-                predicateMonitorType = criteriaBuilder.like(root.get("app"), 
CommonConstants.PROMETHEUS_APP_PREFIX + "%");
-            }
-            Predicate predicateMonitorMetrics = 
criteriaBuilder.equal(root.get("metrics"), metrics);
-            Predicate predicateMonitorMetric = 
criteriaBuilder.equal(root.get("metric"), metric);
-            andList.add(predicateInstance);
-            andList.add(predicateMonitorType);
-            andList.add(predicateMonitorMetrics);
-            andList.add(predicateMonitorMetric);
-
-            if (history != null) {
-                try {
-                    TemporalAmount temporalAmount = 
TimePeriodUtil.parseTokenTime(history);
-                    ZonedDateTime dateTime = 
ZonedDateTime.now().minus(temporalAmount);
-                    long timeBefore = dateTime.toEpochSecond() * 1000L;
-                    Predicate timePredicate = 
criteriaBuilder.ge(root.get("time"), timeBefore);
-                    andList.add(timePredicate);
-                } catch (Exception e) {
-                    log.error(e.getMessage());
-                }
-            }
-            Predicate[] predicates = new Predicate[andList.size()];
-            Predicate predicate = 
criteriaBuilder.and(andList.toArray(predicates));
-            return query.where(predicate).getRestriction();
-        };
-        Sort sortExp = Sort.by(new Sort.Order(Sort.Direction.DESC, "time"));
-        List<History> historyList = historyDao.findAll(specification, sortExp);
-        for (History dataItem : historyList) {
-            String value = "";
-            if (dataItem.getMetricType() == CommonConstants.TYPE_NUMBER) {
-                if (dataItem.getDou() != null) {
-                    value = BigDecimal.valueOf(dataItem.getDou()).setScale(4, 
RoundingMode.HALF_UP)
-                            .stripTrailingZeros().toPlainString();
-                }
-            } else {
-                value = dataItem.getStr();
-            }
-            String instanceValue = dataItem.getMetricLabels() == null ? "" : 
dataItem.getMetricLabels();
-            List<Value> valueList = 
instanceValuesMap.computeIfAbsent(instanceValue, k -> new LinkedList<>());
-            valueList.add(new Value(value, dataItem.getTime()));
-        }
-        return instanceValuesMap;
-    }
-
-    private String formatStrValue(String value) {
-        if (value == null) {
-            return "";
-        }
-        value = value.replace("'", "\\'");
-        value = value.replace("\"", "\\\"");
-        value = value.replace("*", "-");
-        value = String.format("`%s`", value);
-        if (value.length() > STRING_MAX_LENGTH) {
-            value = value.substring(0, STRING_MAX_LENGTH - 1);
-        }
-        return value;
-    }
-
-    @Override
-    public Map<String, List<Value>> getHistoryIntervalMetricData(String 
instance, String app, String metrics, String metric, String history) {
-        return new HashMap<>(8);
-    }
-
-    @Override
-    public void destroy() throws Exception {
-    }
-}
diff --git a/material/licenses/LICENSE b/material/licenses/LICENSE
index 85eeac878b..cbdcda866a 100644
--- a/material/licenses/LICENSE
+++ b/material/licenses/LICENSE
@@ -541,6 +541,7 @@ The text of each license is also included in 
licenses/LICENSE-[project].txt.
     https://mvnrepository.com/artifact/com.taosdata.jdbc/taos-jdbcdriver/3.0.0
     https://mvnrepository.com/artifact/com.beetstra.jutf7/jutf7/1.0.0
     https://mvnrepository.com/artifact/org.testcontainers/kafka/1.20.2 MIT
+    https://mvnrepository.com/artifact/org.duckdb/duckdb_jdbc/1.4.2.0 MIT
 
 ========================================================================
 MPL-1.1 licenses
diff --git a/pom.xml b/pom.xml
index c8d5d2aa65..5a3904365f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -169,6 +169,7 @@
         <commons-net>3.8.0</commons-net>
         <zookeeper.version>3.9.3</zookeeper.version>
 
+        <duckdb.version>1.4.2.0</duckdb.version>
         <iotdb-session.version>0.13.3</iotdb-session.version>
         <influxdb.version>2.23</influxdb.version>
         
<spring-cloud-starter-openfeign.version>3.0.5</spring-cloud-starter-openfeign.version>
diff --git a/script/application.yml b/script/application.yml
index e141b4606c..ea661edf93 100644
--- a/script/application.yml
+++ b/script/application.yml
@@ -169,13 +169,10 @@ common:
 warehouse:
   store:
     # store history metrics data, enable only one below
-    jpa:
+    duckdb:
       enabled: true
       # The maximum retention time for history records, after which records 
will be deleted
-      expire-time: 1h
-      # The maximum number of history records retained, if this number is 
exceeded, half of the data in this configuration item will be deleted
-      # (please set this configuration reasonably as history records can 
affect performance when it is large)
-      max-history-record-num: 6000
+      expire-time: 90d
     victoria-metrics:
       # Standalone mode toggle — must be set to false when using cluster mode
       enabled: false


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to