This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a2bbb5104b8 Fix that compression ratio is not transferred during
region migration (#16352)
a2bbb5104b8 is described below
commit a2bbb5104b8552cd5c73af1cfd2e367e88b839b3
Author: Jiang Tian <[email protected]>
AuthorDate: Mon Sep 8 16:03:33 2025 +0800
Fix that compression ratio is not transferred during region migration
(#16352)
---
.../IoTDBRegionMigrateWithCompressionRatioIT.java | 130 +++++++++++++++++++++
.../dataregion/flush/CompressionRatio.java | 19 ++-
.../dataregion/snapshot/SnapshotLoader.java | 42 ++++++-
.../dataregion/snapshot/SnapshotTaker.java | 27 +++++
.../dataregion/snapshot/IoTDBSnapshotTest.java | 8 ++
5 files changed, 224 insertions(+), 2 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithCompressionRatioIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithCompressionRatioIT.java
new file mode 100644
index 00000000000..e9fddd2d919
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithCompressionRatioIT.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1;
+
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.apache.tsfile.utils.Pair;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllDataNodes;
+import static
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMapWithLeader;
+
+public class IoTDBRegionMigrateWithCompressionRatioIT {
+ @Before
+ public void setUp() throws Exception {
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setDataReplicationFactor(2)
+ .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
+ EnvFactory.getEnv().initClusterEnvironment(1, 3);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testWithCompressionRatio() throws Exception {
+ try (Connection connection = EnvFactory.getEnv().getTableConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("CREATE DATABASE test");
+ statement.execute("USE test");
+ statement.execute("CREATE TABLE t1 (s1 INT64 FIELD)");
+ statement.execute("INSERT INTO t1 (time, s1) VALUES (100, 100)");
+ statement.execute("FLUSH");
+
+ Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMapWithLeader =
+ getDataRegionMapWithLeader(statement);
+ int dataRegionIdForTest =
+
dataRegionMapWithLeader.keySet().stream().max(Integer::compare).get();
+
+ Pair<Integer, Set<Integer>> leaderAndNodes =
dataRegionMapWithLeader.get(dataRegionIdForTest);
+ Set<Integer> allDataNodes = getAllDataNodes(statement);
+ int leaderId = leaderAndNodes.getLeft();
+ int followerId =
+ leaderAndNodes.getRight().stream().filter(i -> i !=
leaderId).findAny().get();
+ int newLeaderId =
+ allDataNodes.stream().filter(i -> i != leaderId && i !=
followerId).findAny().get();
+
+ System.out.printf(
+ "Old leader: %d, follower: %d, new leader: %d%n", leaderId,
followerId, newLeaderId);
+
+ double[] compressionRatioBeforeMigration = new double[] {Double.NaN};
+ Awaitility.await()
+ .atMost(10, TimeUnit.MINUTES)
+ .pollDelay(1, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ try (ResultSet showRegions = statement.executeQuery("SHOW
REGIONS")) {
+ while (showRegions.next()) {
+ int regionId = showRegions.getInt("RegionId");
+ int dataNodeId = showRegions.getInt("DataNodeId");
+ if (regionId == dataRegionIdForTest && dataNodeId ==
leaderId) {
+ compressionRatioBeforeMigration[0] =
+ showRegions.getDouble("CompressionRatio");
+ break;
+ }
+ }
+ }
+
Assert.assertFalse(Double.isNaN(compressionRatioBeforeMigration[0]));
+ });
+
+ statement.execute(
+ String.format(
+ "migrate region %d from %d to %d", dataRegionIdForTest,
leaderId, newLeaderId));
+
+ double finalCompressionRatioBeforeMigration =
compressionRatioBeforeMigration[0];
+ Awaitility.await()
+ .atMost(10, TimeUnit.MINUTES)
+ .pollDelay(1, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ double compressionRatioAfterMigration = 0.0;
+ try (ResultSet showRegions = statement.executeQuery("SHOW
REGIONS")) {
+ while (showRegions.next()) {
+ int regionId = showRegions.getInt("RegionId");
+ int dataNodeId = showRegions.getInt("DataNodeId");
+ if (regionId == dataRegionIdForTest && dataNodeId ==
newLeaderId) {
+ compressionRatioAfterMigration =
showRegions.getDouble("CompressionRatio");
+ break;
+ }
+ }
+ }
+ Assert.assertEquals(
+ finalCompressionRatioBeforeMigration,
compressionRatioAfterMigration, 0.0001);
+ });
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
index 724f4d0decf..a56b895a586 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
@@ -316,7 +316,7 @@ public class CompressionRatio {
}
@TestOnly
- void reset() throws IOException {
+ public void reset() throws IOException {
if (!directory.exists()) {
return;
}
@@ -328,9 +328,26 @@ public class CompressionRatio {
Files.delete(file.toPath());
}
totalMemorySize = new AtomicLong(0);
+ dataRegionRatioMap.clear();
totalDiskSize = 0L;
}
+ public synchronized File getCompressionRatioFile(String dataRegionId) {
+ Pair<Long, Long> dataRegionCompressionRatio =
dataRegionRatioMap.get(dataRegionId);
+ if (dataRegionCompressionRatio == null) {
+ return null;
+ }
+ return SystemFileFactory.INSTANCE.getFile(
+ directory,
+ String.format(
+ Locale.ENGLISH,
+ RATIO_FILE_PATH_FORMAT,
+ dataRegionCompressionRatio.getLeft(),
+ dataRegionCompressionRatio.getRight())
+ + "."
+ + dataRegionId);
+ }
+
public Map<String, Pair<Long, Long>> getDataRegionRatioMap() {
return dataRegionRatioMap;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java
index 5c5c8e490ce..daa64274b12 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
@@ -121,7 +122,9 @@ public class SnapshotLoader {
return null;
}
LOGGER.info("Moving snapshot file to data dirs");
- createLinksFromSnapshotDirToDataDirWithoutLog(new File(snapshotPath));
+ File snapshotDir = new File(snapshotPath);
+ createLinksFromSnapshotDirToDataDirWithoutLog(snapshotDir);
+ loadCompressionRatio(snapshotDir);
return loadSnapshot();
} catch (IOException | DiskSpaceInsufficientException e) {
LOGGER.error(
@@ -130,6 +133,34 @@ public class SnapshotLoader {
}
}
+ private void loadCompressionRatio(File snapshotDir) {
+ File[] compressionFiles =
+ snapshotDir.listFiles(f ->
f.getName().startsWith(CompressionRatio.FILE_PREFIX));
+ if (compressionFiles == null || compressionFiles.length == 0) {
+ LOGGER.info("No compression ratio file in dir {}", snapshotPath);
+ return;
+ }
+ File ratioFile = compressionFiles[0];
+ String fileName = ratioFile.getName();
+ String ratioPart = fileName.substring(0, fileName.lastIndexOf("."));
+ String dataRegionId = fileName.substring(fileName.lastIndexOf(".") + 1);
+
+ String[] fileNameArray = ratioPart.split("-");
+ // fileNameArray.length != 3 means the compression ratio may be negative,
ignore it
+ if (fileNameArray.length == 3) {
+ try {
+ long rawSize = Long.parseLong(fileNameArray[1]);
+ long diskSize = Long.parseLong(fileNameArray[2]);
+ CompressionRatio.getInstance().updateRatio(rawSize, diskSize,
dataRegionId);
+ } catch (NumberFormatException ignore) {
+ // ignore illegal compression file name
+ } catch (IOException e) {
+ LOGGER.warn("Cannot load compression ratio from {}", ratioFile, e);
+ }
+ }
+ LOGGER.info("Loaded compression ratio from {}", ratioFile);
+ }
+
private DataRegion loadSnapshotWithLog(File logFile) {
boolean snapshotComplete = false;
try {
@@ -151,6 +182,7 @@ public class SnapshotLoader {
deleteAllFilesInDataDirs();
LOGGER.info("Remove all data files in original data dir");
createLinksFromSnapshotDirToDataDirWithLog();
+ loadCompressionRatio(new File(snapshotPath));
return loadSnapshot();
} catch (IOException e) {
LOGGER.error("Failed to remove origin data files", e);
@@ -497,6 +529,14 @@ public class SnapshotLoader {
+ snapshotId;
fileList.addAll(searchDataFilesRecursively(snapshotDir));
}
+
+ File[] compressionRatioFiles =
+ logFile
+ .getParentFile()
+ .listFiles(f ->
f.getName().startsWith(CompressionRatio.FILE_PREFIX));
+ if (compressionRatioFiles != null) {
+ fileList.addAll(Arrays.asList(compressionRatioFiles));
+ }
return fileList;
} finally {
analyzer.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java
index e60d7e14ca1..dde04c10952 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DirectoryNotLegalException;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -100,6 +101,7 @@ public class SnapshotTaker {
}
success = createSnapshot(seqFiles, tempSnapshotId);
success = success && createSnapshot(unseqFiles, tempSnapshotId);
+ success = success && snapshotCompressionRatio(snapshotDirPath);
} finally {
readUnlockTheFile();
}
@@ -136,6 +138,31 @@ public class SnapshotTaker {
}
}
+ private boolean snapshotCompressionRatio(String snapshotDir) {
+ File compressionRatioFile =
+
CompressionRatio.getInstance().getCompressionRatioFile(dataRegion.getDataRegionId());
+ if (compressionRatioFile != null) {
+ LOGGER.info("Snapshotting compression ratio {}.",
compressionRatioFile.getName());
+ try {
+ File snapshotFile = new File(snapshotDir,
compressionRatioFile.getName());
+ if (snapshotFile.createNewFile()) {
+ // write one byte so that it will not be skipped
+ Files.write(snapshotFile.toPath(), new byte[1]);
+ LOGGER.info(
+ "Snapshot compression ratio {} in {}.",
compressionRatioFile.getName(), snapshotDir);
+ return true;
+ }
+ } catch (IOException ignored) {
+ LOGGER.warn(
+ "Cannot snapshot compression ratio {} in {}.",
+ compressionRatioFile.getName(),
+ snapshotDir);
+ }
+ return false;
+ }
+ return true;
+ }
+
public boolean cleanSnapshot() {
return clearSnapshotOfDataRegion(this.dataRegion);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
index 00bbc38cf23..6bec0df050b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.DirectoryNotLegalException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
@@ -32,6 +33,7 @@ import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsFileGeneratorUtils;
import org.junit.After;
import org.junit.Assert;
@@ -179,18 +181,24 @@ public class IoTDBSnapshotTest {
try {
List<TsFileResource> resources = writeTsFiles();
DataRegion region = new DataRegion(testSgName, "0");
+ CompressionRatio.getInstance().updateRatio(100, 100, "0");
region.getTsFileManager().addAll(resources, true);
File snapshotDir = new File("target" + File.separator + "snapshot");
Assert.assertTrue(snapshotDir.exists() || snapshotDir.mkdirs());
try {
Assert.assertTrue(
new
SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true));
+ CompressionRatio.getInstance().reset();
+
DataRegion dataRegion =
new SnapshotLoader(snapshotDir.getAbsolutePath(), testSgName, "0")
.loadSnapshotForStateMachine();
Assert.assertNotNull(dataRegion);
List<TsFileResource> resource =
dataRegion.getTsFileManager().getTsFileList(true);
Assert.assertEquals(100, resource.size());
+ Assert.assertEquals(
+ new Pair<>(100L, 100L),
+ CompressionRatio.getInstance().getDataRegionRatioMap().get("0"));
} finally {
FileUtils.recursivelyDeleteFolder(snapshotDir.getAbsolutePath());
}