This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a2bbb5104b8 Fix that compression ratio is not transferred during 
region migration (#16352)
a2bbb5104b8 is described below

commit a2bbb5104b8552cd5c73af1cfd2e367e88b839b3
Author: Jiang Tian <[email protected]>
AuthorDate: Mon Sep 8 16:03:33 2025 +0800

    Fix that compression ratio is not transferred during region migration 
(#16352)
---
 .../IoTDBRegionMigrateWithCompressionRatioIT.java  | 130 +++++++++++++++++++++
 .../dataregion/flush/CompressionRatio.java         |  19 ++-
 .../dataregion/snapshot/SnapshotLoader.java        |  42 ++++++-
 .../dataregion/snapshot/SnapshotTaker.java         |  27 +++++
 .../dataregion/snapshot/IoTDBSnapshotTest.java     |   8 ++
 5 files changed, 224 insertions(+), 2 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithCompressionRatioIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithCompressionRatioIT.java
new file mode 100644
index 00000000000..e9fddd2d919
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithCompressionRatioIT.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1;
+
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.apache.tsfile.utils.Pair;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllDataNodes;
+import static 
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMapWithLeader;
+
+public class IoTDBRegionMigrateWithCompressionRatioIT {
+  @Before
+  public void setUp() throws Exception {
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        .setDataReplicationFactor(2)
+        .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
+    EnvFactory.getEnv().initClusterEnvironment(1, 3);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testWithCompressionRatio() throws Exception {
+    try (Connection connection = EnvFactory.getEnv().getTableConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE DATABASE test");
+      statement.execute("USE test");
+      statement.execute("CREATE TABLE t1 (s1 INT64 FIELD)");
+      statement.execute("INSERT INTO t1 (time, s1) VALUES (100, 100)");
+      statement.execute("FLUSH");
+
+      Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMapWithLeader =
+          getDataRegionMapWithLeader(statement);
+      int dataRegionIdForTest =
+          
dataRegionMapWithLeader.keySet().stream().max(Integer::compare).get();
+
+      Pair<Integer, Set<Integer>> leaderAndNodes = 
dataRegionMapWithLeader.get(dataRegionIdForTest);
+      Set<Integer> allDataNodes = getAllDataNodes(statement);
+      int leaderId = leaderAndNodes.getLeft();
+      int followerId =
+          leaderAndNodes.getRight().stream().filter(i -> i != 
leaderId).findAny().get();
+      int newLeaderId =
+          allDataNodes.stream().filter(i -> i != leaderId && i != 
followerId).findAny().get();
+
+      System.out.printf(
+          "Old leader: %d, follower: %d, new leader: %d%n", leaderId, 
followerId, newLeaderId);
+
+      double[] compressionRatioBeforeMigration = new double[] {Double.NaN};
+      Awaitility.await()
+          .atMost(10, TimeUnit.MINUTES)
+          .pollDelay(1, TimeUnit.SECONDS)
+          .untilAsserted(
+              () -> {
+                try (ResultSet showRegions = statement.executeQuery("SHOW 
REGIONS")) {
+                  while (showRegions.next()) {
+                    int regionId = showRegions.getInt("RegionId");
+                    int dataNodeId = showRegions.getInt("DataNodeId");
+                    if (regionId == dataRegionIdForTest && dataNodeId == 
leaderId) {
+                      compressionRatioBeforeMigration[0] =
+                          showRegions.getDouble("CompressionRatio");
+                      break;
+                    }
+                  }
+                }
+                
Assert.assertFalse(Double.isNaN(compressionRatioBeforeMigration[0]));
+              });
+
+      statement.execute(
+          String.format(
+              "migrate region %d from %d to %d", dataRegionIdForTest, 
leaderId, newLeaderId));
+
+      double finalCompressionRatioBeforeMigration = 
compressionRatioBeforeMigration[0];
+      Awaitility.await()
+          .atMost(10, TimeUnit.MINUTES)
+          .pollDelay(1, TimeUnit.SECONDS)
+          .untilAsserted(
+              () -> {
+                double compressionRatioAfterMigration = 0.0;
+                try (ResultSet showRegions = statement.executeQuery("SHOW 
REGIONS")) {
+                  while (showRegions.next()) {
+                    int regionId = showRegions.getInt("RegionId");
+                    int dataNodeId = showRegions.getInt("DataNodeId");
+                    if (regionId == dataRegionIdForTest && dataNodeId == 
newLeaderId) {
+                      compressionRatioAfterMigration = 
showRegions.getDouble("CompressionRatio");
+                      break;
+                    }
+                  }
+                }
+                Assert.assertEquals(
+                    finalCompressionRatioBeforeMigration, 
compressionRatioAfterMigration, 0.0001);
+              });
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
index 724f4d0decf..a56b895a586 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
@@ -316,7 +316,7 @@ public class CompressionRatio {
   }
 
   @TestOnly
-  void reset() throws IOException {
+  public void reset() throws IOException {
     if (!directory.exists()) {
       return;
     }
@@ -328,9 +328,26 @@ public class CompressionRatio {
       Files.delete(file.toPath());
     }
     totalMemorySize = new AtomicLong(0);
+    dataRegionRatioMap.clear();
     totalDiskSize = 0L;
   }
 
+  public synchronized File getCompressionRatioFile(String dataRegionId) {
+    Pair<Long, Long> dataRegionCompressionRatio = 
dataRegionRatioMap.get(dataRegionId);
+    if (dataRegionCompressionRatio == null) {
+      return null;
+    }
+    return SystemFileFactory.INSTANCE.getFile(
+        directory,
+        String.format(
+                Locale.ENGLISH,
+                RATIO_FILE_PATH_FORMAT,
+                dataRegionCompressionRatio.getLeft(),
+                dataRegionCompressionRatio.getRight())
+            + "."
+            + dataRegionId);
+  }
+
   public Map<String, Pair<Long, Long>> getDataRegionRatioMap() {
     return dataRegionRatioMap;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java
index 5c5c8e490ce..daa64274b12 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
 import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
 import 
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
 
@@ -121,7 +122,9 @@ public class SnapshotLoader {
         return null;
       }
       LOGGER.info("Moving snapshot file to data dirs");
-      createLinksFromSnapshotDirToDataDirWithoutLog(new File(snapshotPath));
+      File snapshotDir = new File(snapshotPath);
+      createLinksFromSnapshotDirToDataDirWithoutLog(snapshotDir);
+      loadCompressionRatio(snapshotDir);
       return loadSnapshot();
     } catch (IOException | DiskSpaceInsufficientException e) {
       LOGGER.error(
@@ -130,6 +133,34 @@ public class SnapshotLoader {
     }
   }
 
+  private void loadCompressionRatio(File snapshotDir) {
+    File[] compressionFiles =
+        snapshotDir.listFiles(f -> 
f.getName().startsWith(CompressionRatio.FILE_PREFIX));
+    if (compressionFiles == null || compressionFiles.length == 0) {
+      LOGGER.info("No compression ratio file in dir {}", snapshotPath);
+      return;
+    }
+    File ratioFile = compressionFiles[0];
+    String fileName = ratioFile.getName();
+    String ratioPart = fileName.substring(0, fileName.lastIndexOf("."));
+    String dataRegionId = fileName.substring(fileName.lastIndexOf(".") + 1);
+
+    String[] fileNameArray = ratioPart.split("-");
+    // fileNameArray.length != 3 means the compression ratio may be negative, 
ignore it
+    if (fileNameArray.length == 3) {
+      try {
+        long rawSize = Long.parseLong(fileNameArray[1]);
+        long diskSize = Long.parseLong(fileNameArray[2]);
+        CompressionRatio.getInstance().updateRatio(rawSize, diskSize, 
dataRegionId);
+      } catch (NumberFormatException ignore) {
+        // ignore illegal compression file name
+      } catch (IOException e) {
+        LOGGER.warn("Cannot load compression ratio from {}", ratioFile, e);
+      }
+    }
+    LOGGER.info("Loaded compression ratio from {}", ratioFile);
+  }
+
   private DataRegion loadSnapshotWithLog(File logFile) {
     boolean snapshotComplete = false;
     try {
@@ -151,6 +182,7 @@ public class SnapshotLoader {
         deleteAllFilesInDataDirs();
         LOGGER.info("Remove all data files in original data dir");
         createLinksFromSnapshotDirToDataDirWithLog();
+        loadCompressionRatio(new File(snapshotPath));
         return loadSnapshot();
       } catch (IOException e) {
         LOGGER.error("Failed to remove origin data files", e);
@@ -497,6 +529,14 @@ public class SnapshotLoader {
                 + snapshotId;
         fileList.addAll(searchDataFilesRecursively(snapshotDir));
       }
+
+      File[] compressionRatioFiles =
+          logFile
+              .getParentFile()
+              .listFiles(f -> 
f.getName().startsWith(CompressionRatio.FILE_PREFIX));
+      if (compressionRatioFiles != null) {
+        fileList.addAll(Arrays.asList(compressionRatioFiles));
+      }
       return fileList;
     } finally {
       analyzer.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java
index e60d7e14ca1..dde04c10952 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.DirectoryNotLegalException;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -100,6 +101,7 @@ public class SnapshotTaker {
         }
         success = createSnapshot(seqFiles, tempSnapshotId);
         success = success && createSnapshot(unseqFiles, tempSnapshotId);
+        success = success && snapshotCompressionRatio(snapshotDirPath);
       } finally {
         readUnlockTheFile();
       }
@@ -136,6 +138,31 @@ public class SnapshotTaker {
     }
   }
 
+  private boolean snapshotCompressionRatio(String snapshotDir) {
+    File compressionRatioFile =
+        
CompressionRatio.getInstance().getCompressionRatioFile(dataRegion.getDataRegionId());
+    if (compressionRatioFile != null) {
+      LOGGER.info("Snapshotting compression ratio {}.", 
compressionRatioFile.getName());
+      try {
+        File snapshotFile = new File(snapshotDir, 
compressionRatioFile.getName());
+        if (snapshotFile.createNewFile()) {
+          // write one byte so that it will not be skipped
+          Files.write(snapshotFile.toPath(), new byte[1]);
+          LOGGER.info(
+              "Snapshot compression ratio {} in {}.", 
compressionRatioFile.getName(), snapshotDir);
+          return true;
+        }
+      } catch (IOException ignored) {
+        LOGGER.warn(
+            "Cannot snapshot compression ratio {} in {}.",
+            compressionRatioFile.getName(),
+            snapshotDir);
+      }
+      return false;
+    }
+    return true;
+  }
+
   public boolean cleanSnapshot() {
     return clearSnapshotOfDataRegion(this.dataRegion);
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
index 00bbc38cf23..6bec0df050b 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.exception.DirectoryNotLegalException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
@@ -32,6 +33,7 @@ import org.apache.iotdb.db.utils.EnvironmentUtils;
 
 import org.apache.tsfile.exception.write.WriteProcessException;
 import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.TsFileGeneratorUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -179,18 +181,24 @@ public class IoTDBSnapshotTest {
     try {
       List<TsFileResource> resources = writeTsFiles();
       DataRegion region = new DataRegion(testSgName, "0");
+      CompressionRatio.getInstance().updateRatio(100, 100, "0");
       region.getTsFileManager().addAll(resources, true);
       File snapshotDir = new File("target" + File.separator + "snapshot");
       Assert.assertTrue(snapshotDir.exists() || snapshotDir.mkdirs());
       try {
         Assert.assertTrue(
             new 
SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true));
+        CompressionRatio.getInstance().reset();
+
         DataRegion dataRegion =
             new SnapshotLoader(snapshotDir.getAbsolutePath(), testSgName, "0")
                 .loadSnapshotForStateMachine();
         Assert.assertNotNull(dataRegion);
         List<TsFileResource> resource = 
dataRegion.getTsFileManager().getTsFileList(true);
         Assert.assertEquals(100, resource.size());
+        Assert.assertEquals(
+            new Pair<>(100L, 100L),
+            CompressionRatio.getInstance().getDataRegionRatioMap().get("0"));
       } finally {
         FileUtils.recursivelyDeleteFolder(snapshotDir.getAbsolutePath());
       }

Reply via email to