This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch support_schema_evolution
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/support_schema_evolution by
this push:
new eb1df4a5f9d EvolvedSchema may merge
eb1df4a5f9d is described below
commit eb1df4a5f9df0cc52608931f44e844299802b1f9
Author: Tian Jiang <[email protected]>
AuthorDate: Thu Nov 27 16:26:35 2025 +0800
EvolvedSchema may merge
---
.../db/storageengine/dataregion/DataRegion.java | 138 +++++++++++++++------
.../dataregion/tsfile/TsFileManager.java | 22 ++++
.../dataregion/tsfile/TsFileResource.java | 8 ++
.../dataregion/tsfile/evolution/EvolvedSchema.java | 69 +++++++++--
.../dataregion/tsfile/fileset/TsFileSet.java | 18 ++-
.../tsfile/evolution/EvolvedSchemaTest.java | 57 +++++++++
6 files changed, 265 insertions(+), 47 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index cb6803a270a..b1520c56ec9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -126,6 +126,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset.TsFileSet;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex;
@@ -648,7 +649,7 @@ public class DataRegion implements IDataRegionForQuery {
}
}
// ensure that seq and unseq files in the same partition have the same
TsFileSet
- Map<Long, List<TsFileSet>> recoveredTsFileSetMap = new HashMap<>();
+ Map<Long, List<TsFileSet>> recoveredPartitionTsFileSetMap = new
HashMap<>();
for (Entry<Long, List<TsFileResource>> partitionFiles :
partitionTmpSeqTsFiles.entrySet()) {
Callable<Void> asyncRecoverTask =
@@ -658,7 +659,7 @@ public class DataRegion implements IDataRegionForQuery {
partitionFiles.getValue(),
fileTimeIndexMap,
true,
- recoveredTsFileSetMap);
+ recoveredPartitionTsFileSetMap);
if (asyncRecoverTask != null) {
asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
}
@@ -672,7 +673,7 @@ public class DataRegion implements IDataRegionForQuery {
partitionFiles.getValue(),
fileTimeIndexMap,
false,
- recoveredTsFileSetMap);
+ recoveredPartitionTsFileSetMap);
if (asyncRecoverTask != null) {
asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
}
@@ -990,52 +991,74 @@ public class DataRegion implements IDataRegionForQuery {
}
}
+ private String getFileSetsDir(long partitionId) {
+ return dataRegionSysDir
+ + File.separator
+ + partitionId
+ + File.separator
+ + TsFileSet.FILE_SET_DIR_NAME;
+ }
+
+ private List<TsFileSet> recoverTsFileSets(
+ long partitionId,
+ Map<Long, List<TsFileSet>> tsFileSetMap
+ ) {
+ List<TsFileSet> tsFileSets =
+ tsFileSetMap.computeIfAbsent(
+ partitionId,
+ pid -> {
+ File fileSetDir =
+ new File(getFileSetsDir(partitionId));
+ File[] fileSets = fileSetDir.listFiles();
+ if (fileSets == null || fileSets.length == 0) {
+ return Collections.emptyList();
+ } else {
+ List<TsFileSet> results = new ArrayList<>();
+ for (File fileSet : fileSets) {
+ TsFileSet tsFileSet;
+ try {
+ tsFileSet =
+ new TsFileSet(
+ Long.parseLong(fileSet.getName()),
+ fileSetDir.getAbsolutePath(),
+ true);
+ } catch (NumberFormatException e) {
+ continue;
+ }
+ results.add(tsFileSet);
+ }
+ return results;
+ }
+ });
+ if (!tsFileSets.isEmpty()) {
+ tsFileSets.sort(null);
+ lastTsFileSetMap.put(partitionId, tsFileSets.get(tsFileSets.size() - 1));
+ }
+ return tsFileSets;
+ }
+
+
private Callable<Void> recoverFilesInPartition(
long partitionId,
DataRegionRecoveryContext context,
List<TsFileResource> resourceList,
Map<TsFileID, FileTimeIndex> fileTimeIndexMap,
boolean isSeq,
- Map<Long, List<TsFileSet>> tsFileSetMap) {
+ Map<Long, List<TsFileSet>> partitionTsFileSetMap) {
List<TsFileResource> resourceListForAsyncRecover = new ArrayList<>();
List<TsFileResource> resourceListForSyncRecover = new ArrayList<>();
Callable<Void> asyncRecoverTask = null;
for (TsFileResource tsFileResource : resourceList) {
- List<TsFileSet> tsFileSets =
- tsFileSetMap.computeIfAbsent(
- partitionId,
- pid -> {
- File fileSetDir =
- new File(
- dataRegionSysDir
- + File.separator
- + partitionId
- + File.separator
- + TsFileSet.FILE_SET_DIR_NAME);
- File[] fileSets = fileSetDir.listFiles();
- if (fileSets == null || fileSets.length == 0) {
- return Collections.emptyList();
- } else {
- List<TsFileSet> results = new ArrayList<>();
- for (File fileSet : fileSets) {
- TsFileSet tsFileSet;
- try {
- tsFileSet =
- new TsFileSet(
- Long.parseLong(fileSet.getName()),
- fileSetDir.getAbsolutePath(),
- true);
- } catch (NumberFormatException e) {
- continue;
- }
- results.add(tsFileSet);
- }
- return results;
- }
- });
- if (!tsFileSets.isEmpty()) {
- tsFileSets.sort(null);
+ List<TsFileSet> tsFileSets = recoverTsFileSets(partitionId,
partitionTsFileSetMap);
+ long fileVersion = tsFileResource.getTsFileID().fileVersion;
+ int i = Collections.binarySearch(tsFileSets,
TsFileSet.comparatorKey(fileVersion));
+ if (i < 0) {
+ i = -i;
+ }
+ if (i < tsFileSets.size()) {
+ List<TsFileSet> containedSets = tsFileSets.subList(i,
tsFileSets.size());
+ containedSets.forEach(tsFileResource::addFileSet);
}
tsFileManager.add(tsFileResource, isSeq);
@@ -1158,6 +1181,45 @@ public class DataRegion implements IDataRegionForQuery {
}
}
+ private TsFileSet createNewFileSet(long maxVersion, long partitionId) {
+ TsFileSet newSet = new TsFileSet(maxVersion, getFileSetsDir(partitionId),
false);
+ tsFileManager.addTsFileSet(newSet, partitionId);
+ return newSet;
+ }
+
+ public void applySchemaEvolution(List<SchemaEvolution> schemaEvolutions) {
+ long startTime = System.nanoTime();
+ writeLock("InsertRow");
+ PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() -
startTime);
+ try {
+ if (deleted) {
+ return;
+ }
+
+ syncCloseAllWorkingTsFileProcessors();
+
+ for (Entry<Long, Long> partitionVersionEntry :
partitionMaxFileVersions.entrySet()) {
+ long partitionId = partitionVersionEntry.getKey();
+ long maxVersion = partitionVersionEntry.getValue();
+ lastTsFileSetMap.compute(partitionId, (pid, lastSet) -> {
+ if (lastSet == null) {
+ lastSet = createNewFileSet(maxVersion, partitionId);
+ } else if (lastSet.getEndVersion() < maxVersion) {
+ lastSet = createNewFileSet(maxVersion, partitionId);
+ }
+ try {
+ lastSet.appendSchemaEvolution(schemaEvolutions);
+ } catch (IOException e) {
+ logger.error("Cannot append schema evolutions to fileSets in
partition {}-{}", dataRegionId, partitionId, e);
+ }
+ return lastSet;
+ });
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
/**
* insert one row of data.
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
index b7c1ba2c14f..4466668ad5f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModFileManagement;
import
org.apache.iotdb.db.storageengine.dataregion.modification.PartitionLevelModFileManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset.TsFileSet;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder;
import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
@@ -507,4 +508,25 @@ public class TsFileManager {
}
return maxFileTimestamp;
}
+
+ public void addTsFileSet(TsFileSet newSet, long partitionId) {
+ writeLock("addTsFileSet");
+ try {
+ TsFileResourceList tsFileResources = sequenceFiles.get(partitionId);
+ if (tsFileResources != null) {
+ for (TsFileResource tsFileResource : tsFileResources) {
+ tsFileResource.addFileSet(newSet);
+ }
+ }
+
+ tsFileResources = unsequenceFiles.get(partitionId);
+ if (tsFileResources != null) {
+ for (TsFileResource tsFileResource : tsFileResources) {
+ tsFileResource.addFileSet(newSet);
+ }
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 13c00febe3c..53b07166b94 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -1630,4 +1630,12 @@ public class TsFileResource implements
PersistentResource, Cloneable {
public TsFileResource shallowCloneForNative() throws
CloneNotSupportedException {
return (TsFileResource) clone();
}
+
+ public void addFileSet(TsFileSet tsFileSet) {
+ tsFileSets.add(tsFileSet);
+ }
+
+ public List<TsFileSet> getTsFileSets() {
+ return tsFileSets;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
index 452f79b720b..f56a30bd1e3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
@@ -22,25 +22,25 @@ package
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
public class EvolvedSchema {
// the evolved table names after applying all schema evolution operations
- private final Map<String, String> originalTableNames = new HashMap<>();
+ private Map<String, String> originalTableNames = new LinkedHashMap<>();
/**
* the first key is the evolved table name, the second key is the evolved
column name, and the
* value is the original column name before any schema evolution.
*/
- private final Map<String, Map<String, String>> originalColumnNames = new
HashMap<>();
+ private Map<String, Map<String, String>> originalColumnNames = new
LinkedHashMap<>();
public void renameTable(String oldTableName, String newTableName) {
if (!originalTableNames.containsKey(oldTableName)) {
originalTableNames.put(newTableName, oldTableName);
- // mark the old table name as non-exists
- originalTableNames.put(oldTableName, "");
} else {
// mark the old table name as non-exists
- String originalName = originalTableNames.put(oldTableName, "");
+ String originalName = originalTableNames.remove(oldTableName);
originalTableNames.put(newTableName, originalName);
}
@@ -55,10 +55,8 @@ public class EvolvedSchema {
originalColumnNames.computeIfAbsent(tableName, t -> new
LinkedHashMap<>());
if (!columnNameMap.containsKey(oldColumnName)) {
columnNameMap.put(newColumnName, oldColumnName);
- // mark the old column name as non-exists
- columnNameMap.put(oldColumnName, "");
} else {
- String originalName = columnNameMap.put(oldColumnName, "");
+ String originalName = columnNameMap.remove(oldColumnName);
columnNameMap.put(newColumnName, originalName);
}
}
@@ -74,4 +72,59 @@ public class EvolvedSchema {
}
return columnNameMap.getOrDefault(evolvedColumnName, evolvedColumnName);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ EvolvedSchema that = (EvolvedSchema) o;
+ return Objects.equals(originalTableNames, that.originalTableNames)
+ && Objects.equals(originalColumnNames, that.originalColumnNames);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(originalTableNames, originalColumnNames);
+ }
+
+ @Override
+ public String toString() {
+ return "EvolvedSchema{" +
+ "originalTableNames=" + originalTableNames +
+ ", originalColumnNames=" + originalColumnNames +
+ '}';
+ }
+
+ public static EvolvedSchema deepCopy(EvolvedSchema evolvedSchema) {
+ EvolvedSchema newEvolvedSchema = new EvolvedSchema();
+ newEvolvedSchema.originalTableNames = new
HashMap<>(evolvedSchema.originalTableNames);
+ newEvolvedSchema.originalColumnNames = new
HashMap<>(evolvedSchema.originalColumnNames);
+ return newEvolvedSchema;
+ }
+
+ public static EvolvedSchema merge(EvolvedSchema oldSchema, EvolvedSchema
newSchema) {
+ if (oldSchema == null) {
+ return newSchema;
+ }
+ if (newSchema == null) {
+ return oldSchema;
+ }
+
+ EvolvedSchema mergedSchema = deepCopy(oldSchema);
+ for (Entry<String, String> finalOriginalTableName :
newSchema.originalTableNames.entrySet()) {
+ mergedSchema.renameTable(finalOriginalTableName.getValue(),
finalOriginalTableName.getKey());
+ }
+ for (Entry<String, Map<String, String>> finalTableNameColumnNameMapEntry :
newSchema.originalColumnNames.entrySet()) {
+ for (Entry<String, String> finalColNameOriginalColNameEntry :
finalTableNameColumnNameMapEntry.getValue()
+ .entrySet()) {
+ String finalTableName = finalTableNameColumnNameMapEntry.getKey();
+ String finalColName = finalColNameOriginalColNameEntry.getKey();
+ String originalColName = finalColNameOriginalColNameEntry.getValue();
+ mergedSchema.renameColumn(finalTableName, originalColName,
finalColName);
+ }
+ }
+
+ return mergedSchema;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
index 9d75031d30f..d3271d98b45 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
@@ -35,12 +35,24 @@ public class TsFileSet implements Comparable<TsFileSet> {
private final long endVersion;
private final File fileSetDir;
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock lock;
private SchemaEvolutionFile schemaEvolutionFile;
+ // only As comparator key
+ private TsFileSet(long endVersion) {
+ this.endVersion = endVersion;
+ this.fileSetDir = null;
+ this.lock = null;
+ }
+
+ public static TsFileSet comparatorKey(long endVersion) {
+ return new TsFileSet(endVersion);
+ }
+
public TsFileSet(long endVersion, String fileSetsDir, boolean recover) {
this.endVersion = endVersion;
this.fileSetDir = new File(fileSetsDir + File.separator + endVersion);
+ this.lock = new ReentrantReadWriteLock();
if (recover) {
recover();
@@ -104,4 +116,8 @@ public class TsFileSet implements Comparable<TsFileSet> {
public void readUnlock() {
lock.readLock().unlock();
}
+
+ public long getEndVersion() {
+ return endVersion;
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchemaTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchemaTest.java
new file mode 100644
index 00000000000..d9c76be4710
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchemaTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Test;
+
+public class EvolvedSchemaTest {
+
+ @Test
+ public void testMerge() {
+ // t1 -> t2, t2.s1 -> t2.s2, t3 -> t1
+ List<SchemaEvolution> schemaEvolutionList =
+ Arrays.asList(
+ new TableRename("t1", "t2"),
+ new ColumnRename("t2", "s1", "s2"),
+ new TableRename("t3", "t1"));
+ EvolvedSchema oldSchema = new EvolvedSchema();
+ EvolvedSchema allSchema = new EvolvedSchema();
+ schemaEvolutionList.forEach(schemaEvolution ->
schemaEvolution.applyTo(oldSchema));
+ schemaEvolutionList.forEach(schemaEvolution ->
schemaEvolution.applyTo(allSchema));
+
+ // t1 -> t2 -> t3, t2.s1 -> t2.s2 -> t3.s1, t3 -> t1 -> t2
+ schemaEvolutionList =
+ Arrays.asList(
+ new TableRename("t2", "t3"),
+ new ColumnRename("t3", "s2", "s1"),
+ new TableRename("t1", "t2"));
+ EvolvedSchema newSchema = new EvolvedSchema();
+ schemaEvolutionList.forEach(schemaEvolution ->
schemaEvolution.applyTo(newSchema));
+ schemaEvolutionList.forEach(schemaEvolution ->
schemaEvolution.applyTo(allSchema));
+
+ EvolvedSchema mergedShema = EvolvedSchema.merge(oldSchema, newSchema);
+
+ assertEquals(allSchema, mergedShema);
+ }
+}
\ No newline at end of file