This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch support_schema_evolution
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/support_schema_evolution by
this push:
new f348433e803 Add SchemaEvolutionFIle
f348433e803 is described below
commit f348433e8036a3a30d173383b06e6f81560cd107
Author: Tian Jiang <[email protected]>
AuthorDate: Thu Nov 27 14:57:47 2025 +0800
Add SchemaEvolutionFIle
---
.../db/storageengine/dataregion/DataRegion.java | 53 +++++----
.../dataregion/tsfile/evolution/ColumnRename.java | 14 +--
.../dataregion/tsfile/evolution/EvolvedSchema.java | 13 +--
.../tsfile/evolution/SchemaEvolution.java | 12 +-
.../tsfile/evolution/SchemaEvolutionFile.java | 18 +--
.../dataregion/tsfile/evolution/TableRename.java | 16 +--
.../dataregion/tsfile/fileset/TsFileSet.java | 10 +-
.../tsfile/evolution/SchemaEvolutionFileTest.java | 123 +++++++++++++++++++++
8 files changed, 189 insertions(+), 70 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 734576d9f0d..cb6803a270a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1002,33 +1002,42 @@ public class DataRegion implements IDataRegionForQuery {
List<TsFileResource> resourceListForSyncRecover = new ArrayList<>();
Callable<Void> asyncRecoverTask = null;
for (TsFileResource tsFileResource : resourceList) {
- List<TsFileSet> tsFileSets = tsFileSetMap.computeIfAbsent(partitionId,
- pid -> {
- File fileSetDir = new File(dataRegionSysDir + File.separator +
partitionId + File.separator + TsFileSet.FILE_SET_DIR_NAME);
- File[] fileSets = fileSetDir.listFiles();
- if (fileSets == null || fileSets.length == 0) {
- return Collections.emptyList();
- } else {
- List<TsFileSet> results = new ArrayList<>();
- for (File fileSet : fileSets) {
- TsFileSet tsFileSet;
- try {
- tsFileSet = new TsFileSet(Long.parseLong(fileSet.getName()),
- fileSetDir.getAbsolutePath(), true);
- } catch (NumberFormatException e) {
- continue;
+ List<TsFileSet> tsFileSets =
+ tsFileSetMap.computeIfAbsent(
+ partitionId,
+ pid -> {
+ File fileSetDir =
+ new File(
+ dataRegionSysDir
+ + File.separator
+ + partitionId
+ + File.separator
+ + TsFileSet.FILE_SET_DIR_NAME);
+ File[] fileSets = fileSetDir.listFiles();
+ if (fileSets == null || fileSets.length == 0) {
+ return Collections.emptyList();
+ } else {
+ List<TsFileSet> results = new ArrayList<>();
+ for (File fileSet : fileSets) {
+ TsFileSet tsFileSet;
+ try {
+ tsFileSet =
+ new TsFileSet(
+ Long.parseLong(fileSet.getName()),
+ fileSetDir.getAbsolutePath(),
+ true);
+ } catch (NumberFormatException e) {
+ continue;
+ }
+ results.add(tsFileSet);
+ }
+ return results;
}
- results.add(tsFileSet);
- }
- return results;
- }
- });
+ });
if (!tsFileSets.isEmpty()) {
tsFileSets.sort(null);
}
-
-
tsFileManager.add(tsFileResource, isSeq);
if (fileTimeIndexMap.containsKey(tsFileResource.getTsFileID())
&& tsFileResource.resourceFileExists()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java
index bc8b5946f79..fc363247b13 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java
@@ -19,17 +19,14 @@
package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
+import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import org.apache.tsfile.file.metadata.TsFileMetadata;
-import org.apache.tsfile.utils.Pair;
-import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
-import org.apache.tsfile.utils.ReadWriteIOUtils;
-/**
- * A schema evolution operation that renames a column in a table schema.
- */
+/** A schema evolution operation that renames a column in a table schema. */
public class ColumnRename implements SchemaEvolution {
private String tableName;
@@ -37,8 +34,7 @@ public class ColumnRename implements SchemaEvolution {
private String nameAfter;
// for deserialization
- public ColumnRename() {
- }
+ public ColumnRename() {}
public ColumnRename(String tableName, String nameBefore, String nameAfter) {
this.tableName = tableName.toLowerCase();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
index 165c36fc99b..452f79b720b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
@@ -20,19 +20,16 @@
package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import org.apache.tsfile.file.metadata.TableSchema;
public class EvolvedSchema {
// the evolved table names after applying all schema evolution operations
private final Map<String, String> originalTableNames = new HashMap<>();
+
/**
- * the first key is the evolved table name, the second key is the evolved
column name,
- * and the value is the original column name before any schema evolution.
+ * the first key is the evolved table name, the second key is the evolved
column name, and the
+ * value is the original column name before any schema evolution.
*/
private final Map<String, Map<String, String>> originalColumnNames = new
HashMap<>();
@@ -54,8 +51,8 @@ public class EvolvedSchema {
}
public void renameColumn(String tableName, String oldColumnName, String
newColumnName) {
- Map<String, String> columnNameMap =
originalColumnNames.computeIfAbsent(tableName,
- t -> new LinkedHashMap<>());
+ Map<String, String> columnNameMap =
+ originalColumnNames.computeIfAbsent(tableName, t -> new
LinkedHashMap<>());
if (!columnNameMap.containsKey(oldColumnName)) {
columnNameMap.put(newColumnName, oldColumnName);
// mark the old column name as non-exists
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java
index 7b3b8b8e50d..5a670879beb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java
@@ -19,14 +19,14 @@
package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
-import java.io.IOException;
-import java.io.InputStream;
import org.apache.iotdb.db.utils.io.StreamSerializable;
+
import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
-/**
- * A schema evolution operation that can be applied to a TableSchemaMap.
- */
+import java.io.IOException;
+import java.io.InputStream;
+
+/** A schema evolution operation that can be applied to a TableSchemaMap. */
public interface SchemaEvolution extends StreamSerializable {
/**
@@ -63,4 +63,4 @@ public interface SchemaEvolution extends StreamSerializable {
evolution.deserialize(stream);
return evolution;
}
-}
\ No newline at end of file
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java
index 1f10683c7bf..ac9e94bdc5f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
+import org.apache.iotdb.commons.utils.FileUtils;
+
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
@@ -27,11 +29,8 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.Collection;
-import org.apache.iotdb.commons.utils.FileUtils;
-/**
- * SchemaEvolutionFile manages schema evolutions related to a TsFileSet.
- */
+/** SchemaEvolutionFile manages schema evolutions related to a TsFileSet. */
public class SchemaEvolutionFile {
public static final String FILE_SUFFIX = ".sevo";
@@ -49,21 +48,24 @@ public class SchemaEvolutionFile {
long length = file.length();
String fileName = file.getName();
- long validLength =
Long.parseLong(fileName.substring(fileName.lastIndexOf('.')));
+ long validLength = parseValidLength(fileName);
if (length > validLength) {
- try (FileInputStream fis = new FileInputStream(file);
+ try (FileOutputStream fis = new FileOutputStream(file, true);
FileChannel fileChannel = fis.getChannel()) {
fileChannel.truncate(validLength);
}
}
}
+ public static long parseValidLength(String fileName) {
+ return Long.parseLong(fileName.substring(0, fileName.lastIndexOf('.')));
+ }
public void append(Collection<SchemaEvolution> schemaEvolutions) throws
IOException {
recoverFile();
try (FileOutputStream fos = new FileOutputStream(filePath, true);
- BufferedOutputStream bos = new BufferedOutputStream(fos)) {
+ BufferedOutputStream bos = new BufferedOutputStream(fos)) {
for (SchemaEvolution schemaEvolution : schemaEvolutions) {
schemaEvolution.serialize(bos);
}
@@ -81,7 +83,7 @@ public class SchemaEvolutionFile {
EvolvedSchema evolvedSchema = new EvolvedSchema();
try (FileInputStream fis = new FileInputStream(filePath);
- BufferedInputStream bis = new BufferedInputStream(fis)) {
+ BufferedInputStream bis = new BufferedInputStream(fis)) {
while (bis.available() > 0) {
SchemaEvolution evolution = SchemaEvolution.createFrom(bis);
evolution.applyTo(evolvedSchema);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java
index aac78060b61..02f7f0cfd3e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java
@@ -19,33 +19,27 @@
package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
+import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import org.apache.tsfile.file.metadata.MetadataIndexNode;
-import org.apache.tsfile.file.metadata.TsFileMetadata;
-import org.apache.tsfile.utils.Pair;
-import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
-import org.apache.tsfile.utils.ReadWriteIOUtils;
-/**
- * A schema evolution operation that renames a table in a schema map.
- */
+/** A schema evolution operation that renames a table in a schema map. */
public class TableRename implements SchemaEvolution {
private String nameBefore;
private String nameAfter;
// for deserialization
- public TableRename() {
- }
+ public TableRename() {}
public TableRename(String nameBefore, String nameAfter) {
this.nameBefore = nameBefore.toLowerCase();
this.nameAfter = nameAfter.toLowerCase();
}
-
@Override
public void applyTo(EvolvedSchema evolvedSchema) {
evolvedSchema.renameTable(nameBefore, nameAfter);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
index 19ee2a7ed66..9d75031d30f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
@@ -19,18 +19,16 @@
package org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolutionFile;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema;
-import
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
-import
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolutionFile;
-/**
- * TsFileSet represents a set of TsFiles in a time partition whose version <=
endVersion.
- */
+/** TsFileSet represents a set of TsFiles in a time partition whose version <=
endVersion. */
public class TsFileSet implements Comparable<TsFileSet> {
public static final String FILE_SET_DIR_NAME = "filesets";
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFileTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFileTest.java
new file mode 100644
index 00000000000..d4386dd8172
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFileTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
+
+import java.io.FileOutputStream;
+import org.apache.iotdb.db.utils.constant.TestConstant;
+
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+@SuppressWarnings("ResultOfMethodCallIgnored")
+public class SchemaEvolutionFileTest {
+
+ @After
+ public void tearDown() throws Exception {
+ clearSchemaEvolutionFile();
+ }
+
+ @Test
+ public void testSchemaEvolutionFile() throws IOException {
+ String filePath = TestConstant.BASE_OUTPUT_PATH + File.separator +
"0.sevo";
+
+ SchemaEvolutionFile schemaEvolutionFile = new
SchemaEvolutionFile(filePath);
+
+ // t1 -> t2, t2.s1 -> t2.s2, t3 -> t1
+ List<SchemaEvolution> schemaEvolutionList =
+ Arrays.asList(
+ new TableRename("t1", "t2"),
+ new ColumnRename("t2", "s1", "s2"),
+ new TableRename("t3", "t1"));
+ schemaEvolutionFile.append(schemaEvolutionList);
+
+ EvolvedSchema evolvedSchema = schemaEvolutionFile.readAsSchema();
+ assertEquals("t1", evolvedSchema.getOriginalTableName("t2"));
+ assertEquals("s1", evolvedSchema.getOriginalColumnName("t2", "s2"));
+ assertEquals("t3", evolvedSchema.getOriginalTableName("t1"));
+ // not evolved, should remain the same
+ assertEquals("t4", evolvedSchema.getOriginalTableName("t4"));
+ assertEquals("s3", evolvedSchema.getOriginalColumnName("t2", "s3"));
+
+ // t1 -> t2 -> t3, t2.s1 -> t2.s2 -> t3.s1, t3 -> t1 -> t2
+ schemaEvolutionList =
+ Arrays.asList(
+ new TableRename("t2", "t3"),
+ new ColumnRename("t3", "s2", "s1"),
+ new TableRename("t1", "t2"));
+ schemaEvolutionFile.append(schemaEvolutionList);
+ evolvedSchema = schemaEvolutionFile.readAsSchema();
+ assertEquals("t1", evolvedSchema.getOriginalTableName("t3"));
+ assertEquals("s1", evolvedSchema.getOriginalColumnName("t3", "s1"));
+ assertEquals("t3", evolvedSchema.getOriginalTableName("t2"));
+ // not evolved, should remain the same
+ assertEquals("t4", evolvedSchema.getOriginalTableName("t4"));
+ assertEquals("s3", evolvedSchema.getOriginalColumnName("t2", "s3"));
+ }
+
+ private void clearSchemaEvolutionFile() {
+ File dir = new File(TestConstant.BASE_OUTPUT_PATH);
+ File[] files = dir.listFiles(f ->
f.getName().endsWith(SchemaEvolutionFile.FILE_SUFFIX));
+ if (files != null) {
+ for (File file : files) {
+ file.delete();
+ }
+ }
+ }
+
+ @Test
+ public void testRecover() throws IOException {
+ String filePath = TestConstant.BASE_OUTPUT_PATH + File.separator +
"0.sevo";
+
+ SchemaEvolutionFile schemaEvolutionFile = new
SchemaEvolutionFile(filePath);
+ List<SchemaEvolution> schemaEvolutionList =
+ Arrays.asList(
+ new TableRename("t1", "t2"),
+ new ColumnRename("t2", "s1", "s2"),
+ new TableRename("t3", "t1"));
+ schemaEvolutionFile.append(schemaEvolutionList);
+
+ File dir = new File(TestConstant.BASE_OUTPUT_PATH);
+ File[] files = dir.listFiles(f ->
f.getName().endsWith(SchemaEvolutionFile.FILE_SUFFIX));
+ assertNotNull(files);
+ assertEquals(1, files.length);
+ assertEquals(24, SchemaEvolutionFile.parseValidLength(files[0].getName()));
+
+ try (FileOutputStream fileOutputStream = new FileOutputStream(files[0],
true)) {
+ fileOutputStream.write(new byte[100]);
+ }
+
+ schemaEvolutionFile = new SchemaEvolutionFile(files[0].getAbsolutePath());
+ EvolvedSchema evolvedSchema = schemaEvolutionFile.readAsSchema();
+ assertEquals("t1", evolvedSchema.getOriginalTableName("t2"));
+ assertEquals("s1", evolvedSchema.getOriginalColumnName("t2", "s2"));
+ assertEquals("t3", evolvedSchema.getOriginalTableName("t1"));
+ // not evolved, should remain the same
+ assertEquals("t4", evolvedSchema.getOriginalTableName("t4"));
+ assertEquals("s3", evolvedSchema.getOriginalColumnName("t2", "s3"));
+ }
+}