This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch support_schema_evolution
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/support_schema_evolution by
this push:
new 06a848fd846 rewrite lastFlushTimeMap
06a848fd846 is described below
commit 06a848fd846e923c66de3fb1280864c222913c66
Author: Tian Jiang <[email protected]>
AuthorDate: Thu Nov 27 18:43:08 2025 +0800
rewrite lastFlushTimeMap
---
.gitignore | 1 +
.../db/storageengine/dataregion/DataRegion.java | 18 ++-
.../dataregion/DeviceLastFlushTime.java | 13 ++
.../dataregion/HashLastFlushTimeMap.java | 30 ++++-
.../storageengine/dataregion/ILastFlushTime.java | 3 +
.../dataregion/ILastFlushTimeMap.java | 3 +
.../dataregion/PartitionLastFlushTime.java | 6 +
.../dataregion/tsfile/TsFileResource.java | 5 +-
.../dataregion/tsfile/evolution/EvolvedSchema.java | 42 +++++--
.../dataregion/tsfile/evolution/TableRename.java | 38 ++++++
.../storageengine/dataregion/DataRegionTest.java | 136 +++++++++++++++------
11 files changed, 242 insertions(+), 53 deletions(-)
diff --git a/.gitignore b/.gitignore
index a489dd048da..e4c7c982393 100644
--- a/.gitignore
+++ b/.gitignore
@@ -124,3 +124,4 @@
iotdb-core/tsfile/src/main/antlr4/org/apache/tsfile/parser/gen/
.mvn/.gradle-enterprise/
.mvn/.develocity/
.run/
+*.sevo
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 4a28034c84e..f288e81f82c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -126,6 +126,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset.TsFileSet;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
@@ -167,6 +168,7 @@ import org.apache.thrift.TException;
import org.apache.tsfile.external.commons.io.FileUtils;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.IDeviceID.Factory;
import org.apache.tsfile.fileSystem.FSFactoryProducer;
import org.apache.tsfile.fileSystem.FSType;
import org.apache.tsfile.fileSystem.fsFactory.FSFactory;
@@ -1196,6 +1198,8 @@ public class DataRegion implements IDataRegionForQuery {
return;
}
DataNodeTableCache.getInstance().invalid(databaseName);
+ schemaEvolutions.forEach(lastFlushTimeMap::accept);
+
syncCloseAllWorkingTsFileProcessors();
for (Entry<Long, Long> partitionVersionEntry :
partitionMaxFileVersions.entrySet()) {
@@ -2607,6 +2611,7 @@ public class DataRegion implements IDataRegionForQuery {
* @param tsFileResources includes sealed and unsealed tsfile resources
* @return fill unsealed tsfile resources with memory data and
ChunkMetadataList of data in disk
*/
+ @SuppressWarnings("SuspiciousSystemArraycopy")
private List<TsFileResource> getFileResourceListForQuery(
Collection<TsFileResource> tsFileResources,
List<IFullPath> pathList,
@@ -2619,7 +2624,18 @@ public class DataRegion implements IDataRegionForQuery {
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
for (TsFileResource tsFileResource : tsFileResources) {
- if (!tsFileResource.isSatisfied(singleDeviceId, globalTimeFilter, isSeq,
context.isDebug())) {
+ EvolvedSchema evolvedSchema;
+ try {
+ evolvedSchema = tsFileResource.getMergedEvolvedSchema();
+ } catch (IOException e) {
+ throw new MetadataException(e);
+ }
+ IDeviceID deviceIdBackThen = singleDeviceId;
+ if (evolvedSchema != null) {
+ deviceIdBackThen = evolvedSchema.rewriteDeviceId(singleDeviceId);
+ }
+
+ if (!tsFileResource.isSatisfied(deviceIdBackThen, globalTimeFilter,
isSeq, context.isDebug())) {
continue;
}
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java
index f02044b0414..e7c13f149a9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java
@@ -19,6 +19,10 @@
package org.apache.iotdb.db.storageengine.dataregion;
+import java.util.List;
+import java.util.stream.Collectors;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.TableRename;
import org.apache.tsfile.file.metadata.IDeviceID;
import java.util.HashMap;
@@ -53,4 +57,13 @@ public class DeviceLastFlushTime implements ILastFlushTime {
Map<IDeviceID, Long> getDeviceLastFlushTimeMap() {
return deviceLastFlushTimeMap;
}
+
+ @Override
+ public void accept(SchemaEvolution schemaEvolution) {
+ if (!(schemaEvolution instanceof TableRename)) {
+ return;
+ }
+ TableRename tableRename = (TableRename) schemaEvolution;
+ tableRename.rewriteMap(deviceLastFlushTimeMap);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
index 3f0abfd3e24..cf8badd6d08 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
@@ -19,9 +19,16 @@
package org.apache.iotdb.db.storageengine.dataregion;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
import org.apache.iotdb.db.storageengine.StorageEngine;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.TableRename;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.IDeviceID.Factory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,7 +77,7 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
timePartitionId, id -> new DeviceLastFlushTime());
long memIncr = 0L;
- for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
+ for (Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
if (flushTimeMapForPartition.getLastFlushTime(entry.getKey()) ==
Long.MIN_VALUE) {
memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed();
}
@@ -93,7 +100,7 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
long maxFlushTime = flushTimeMapForPartition.getLastFlushTime(null);
ILastFlushTime newDeviceLastFlushTime = new DeviceLastFlushTime();
long memIncr = 0;
- for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
+ for (Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed();
newDeviceLastFlushTime.updateLastFlushTime(entry.getKey(),
entry.getValue());
maxFlushTime = Math.max(maxFlushTime, entry.getValue());
@@ -104,7 +111,7 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
} else {
// go here when DeviceLastFlushTime was recovered by wal recovery
long memIncr = 0;
- for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
+ for (Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
if (flushTimeMapForPartition.getLastFlushTime(entry.getKey()) ==
Long.MIN_VALUE) {
memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed();
}
@@ -131,7 +138,7 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
// go here when DeviceLastFlushTime was recovered by wal recovery
DeviceLastFlushTime deviceLastFlushTime = (DeviceLastFlushTime)
flushTimeMapForPartition;
Map<IDeviceID, Long> flushedTimeMap =
deviceLastFlushTime.getDeviceLastFlushTimeMap();
- for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
+ for (Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
flushTimeMapForPartition.updateLastFlushTime(entry.getKey(),
entry.getValue());
}
}
@@ -139,7 +146,7 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
@Override
public void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long>
globalFlushedTimeMap) {
- for (Map.Entry<IDeviceID, Long> entry : globalFlushedTimeMap.entrySet()) {
+ for (Entry<IDeviceID, Long> entry : globalFlushedTimeMap.entrySet()) {
globalLatestFlushedTimeForEachDevice.merge(entry.getKey(),
entry.getValue(), Math::max);
}
}
@@ -161,7 +168,7 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
// For insert
@Override
public void updateLatestFlushTime(long partitionId, Map<IDeviceID, Long>
updateMap) {
- for (Map.Entry<IDeviceID, Long> entry : updateMap.entrySet()) {
+ for (Entry<IDeviceID, Long> entry : updateMap.entrySet()) {
partitionLatestFlushedTime
.computeIfAbsent(partitionId, id -> new DeviceLastFlushTime())
.updateLastFlushTime(entry.getKey(), entry.getValue());
@@ -212,4 +219,15 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
}
return 0;
}
+
+ @Override
+ public void accept(SchemaEvolution schemaEvolution) {
+ if (!(schemaEvolution instanceof TableRename)) {
+ return;
+ }
+
+ TableRename tableRename = (TableRename) schemaEvolution;
+ tableRename.rewriteMap(globalLatestFlushedTimeForEachDevice);
+ partitionLatestFlushedTime.values().forEach(t ->
t.accept(schemaEvolution));
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTime.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTime.java
index be68369a42b..f78b6822e90 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTime.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTime.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
import org.apache.tsfile.file.metadata.IDeviceID;
public interface ILastFlushTime {
@@ -28,4 +29,6 @@ public interface ILastFlushTime {
void updateLastFlushTime(IDeviceID device, long time);
ILastFlushTime degradeLastFlushTime();
+
+ void accept(SchemaEvolution schemaEvolution);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
index 7bdd141bf6b..40186cef573 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
import org.apache.tsfile.file.metadata.IDeviceID;
import java.util.Map;
@@ -63,4 +64,6 @@ public interface ILastFlushTimeMap {
void degradeLastFlushTime(long partitionId);
long getMemSize(long partitionId);
+
+ void accept(SchemaEvolution schemaEvolution);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PartitionLastFlushTime.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PartitionLastFlushTime.java
index a5976861441..100d0238618 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PartitionLastFlushTime.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PartitionLastFlushTime.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
import org.apache.tsfile.file.metadata.IDeviceID;
public class PartitionLastFlushTime implements ILastFlushTime {
@@ -43,4 +44,9 @@ public class PartitionLastFlushTime implements ILastFlushTime
{
public ILastFlushTime degradeLastFlushTime() {
return this;
}
+
+ @Override
+ public void accept(SchemaEvolution schemaEvolution) {
+ // no-op
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index d26172122a6..80ab28eec9b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.storageengine.dataregion.tsfile;
-import java.util.stream.Collectors;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
@@ -213,7 +212,7 @@ public class TsFileResource implements PersistentResource,
Cloneable {
private Map<IDeviceID, List<Pair<String, TimeValuePair>>> lastValues;
// TsFileSets this TsFile belongs to
- private List<TsFileSet> tsFileSets;
+ private final List<TsFileSet> tsFileSets = new ArrayList<>();
@TestOnly
public TsFileResource() {
@@ -1641,7 +1640,7 @@ public class TsFileResource implements
PersistentResource, Cloneable {
return tsFileSets;
}
- public EvolvedSchema getMergedSchema() throws IOException {
+ public EvolvedSchema getMergedEvolvedSchema() throws IOException {
List<EvolvedSchema> list = new ArrayList<>();
for (TsFileSet fileSet : getTsFileSets()) {
EvolvedSchema readEvolvedSchema = fileSet.readEvolvedSchema();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
index 13d6bc6fdea..cd899ccc97c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
@@ -24,6 +24,8 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.IDeviceID.Factory;
public class EvolvedSchema {
// the evolved table names after applying all schema evolution operations
@@ -38,9 +40,10 @@ public class EvolvedSchema {
public void renameTable(String oldTableName, String newTableName) {
if (!originalTableNames.containsKey(oldTableName)) {
originalTableNames.put(newTableName, oldTableName);
+ originalTableNames.put(oldTableName, "");
} else {
// mark the old table name as non-exists
- String originalName = originalTableNames.remove(oldTableName);
+ String originalName = originalTableNames.put(oldTableName, "");
originalTableNames.put(newTableName, originalName);
}
@@ -55,8 +58,10 @@ public class EvolvedSchema {
originalColumnNames.computeIfAbsent(tableName, t -> new
LinkedHashMap<>());
if (!columnNameMap.containsKey(oldColumnName)) {
columnNameMap.put(newColumnName, oldColumnName);
+ columnNameMap.put(oldColumnName, "");
} else {
- String originalName = columnNameMap.remove(oldColumnName);
+ // mark the old column name as non-exists
+ String originalName = columnNameMap.put(oldColumnName, "");
columnNameMap.put(newColumnName, originalName);
}
}
@@ -96,6 +101,25 @@ public class EvolvedSchema {
'}';
}
+ public IDeviceID rewriteDeviceId(IDeviceID deviceID) {
+ String tableName = deviceID.getTableName();
+ String originalTableName = getOriginalTableName(tableName);
+ return rewriteDeviceId(deviceID, originalTableName);
+ }
+
+ @SuppressWarnings("SuspiciousSystemArraycopy")
+ public static IDeviceID rewriteDeviceId(IDeviceID deviceID, String
originalTableName) {
+ String tableName = deviceID.getTableName();
+ if (!tableName.equals(originalTableName)) {
+ Object[] segments = deviceID.getSegments();
+ String[] newSegments = new String[segments.length];
+ newSegments[0] = originalTableName;
+ System.arraycopy(segments, 1, newSegments, 1, segments.length - 1);
+ return Factory.DEFAULT_FACTORY.create(newSegments);
+ }
+ return deviceID;
+ }
+
public static EvolvedSchema deepCopy(EvolvedSchema evolvedSchema) {
EvolvedSchema newEvolvedSchema = new EvolvedSchema();
newEvolvedSchema.originalTableNames = new
LinkedHashMap<>(evolvedSchema.originalTableNames);
@@ -123,15 +147,19 @@ public class EvolvedSchema {
if (schemas[i] != null) {
EvolvedSchema newSchema = schemas[i];
for (Entry<String, String> finalOriginalTableName :
newSchema.originalTableNames.entrySet()) {
- mergedSchema.renameTable(finalOriginalTableName.getValue(),
finalOriginalTableName.getKey());
+ if (!finalOriginalTableName.getValue().isEmpty()) {
+ mergedSchema.renameTable(finalOriginalTableName.getValue(),
finalOriginalTableName.getKey());
+ }
}
for (Entry<String, Map<String, String>>
finalTableNameColumnNameMapEntry : newSchema.originalColumnNames.entrySet()) {
for (Entry<String, String> finalColNameOriginalColNameEntry :
finalTableNameColumnNameMapEntry.getValue()
.entrySet()) {
- String finalTableName = finalTableNameColumnNameMapEntry.getKey();
- String finalColName = finalColNameOriginalColNameEntry.getKey();
- String originalColName =
finalColNameOriginalColNameEntry.getValue();
- mergedSchema.renameColumn(finalTableName, originalColName,
finalColName);
+ if (!finalColNameOriginalColNameEntry.getValue().isEmpty()) {
+ String finalTableName =
finalTableNameColumnNameMapEntry.getKey();
+ String finalColName = finalColNameOriginalColNameEntry.getKey();
+ String originalColName =
finalColNameOriginalColNameEntry.getValue();
+ mergedSchema.renameColumn(finalTableName, originalColName,
finalColName);
+ }
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java
index 02f7f0cfd3e..02c9eaf0c26 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java
@@ -19,6 +19,12 @@
package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iotdb.db.queryengine.execution.schedule.queue.ID;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.IDeviceID.Factory;
import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -63,4 +69,36 @@ public class TableRename implements SchemaEvolution {
nameBefore = ReadWriteIOUtils.readVarIntString(stream);
nameAfter = ReadWriteIOUtils.readVarIntString(stream);
}
+
+ public String getNameBefore() {
+ return nameBefore;
+ }
+
+ public String getNameAfter() {
+ return nameAfter;
+ }
+
+ @SuppressWarnings("SuspiciousSystemArraycopy")
+ public IDeviceID rewriteDeviceId(IDeviceID deviceId) {
+ if (!deviceId.getTableName().equals(nameBefore)) {
+ return deviceId;
+ }
+
+ Object[] segments = deviceId.getSegments();
+ String[] newSegments = new String[segments.length];
+ newSegments[0] = nameAfter;
+ System.arraycopy(segments, 1, newSegments, 1, segments.length - 1);
+ return Factory.DEFAULT_FACTORY.create(newSegments);
+ }
+
+ public <T> void rewriteMap(Map<IDeviceID, T> map) {
+ List<IDeviceID> affectedDeviceId = map.keySet().stream()
+ .filter(k -> k.getTableName().equals(getNameBefore())).collect(
+ Collectors.toList());
+ for (IDeviceID deviceID : affectedDeviceId) {
+ IDeviceID newDeviceId = rewriteDeviceId(deviceID);
+ T removed = map.remove(deviceID);
+ map.put(newDeviceId, removed);
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 921fe860b8c..5b692e70571 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion;
+import java.nio.charset.StandardCharsets;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.DataRegionId;
@@ -30,6 +31,7 @@ import org.apache.iotdb.commons.path.IFullPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.NonAlignedFullPath;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DataRegionException;
@@ -63,6 +65,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.TableRename;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.iotdb.db.storageengine.rescon.memory.MemTableManager;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
@@ -71,10 +74,12 @@ import org.apache.iotdb.db.utils.constant.TestConstant;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.IDeviceID.Factory;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.read.reader.IPointReader;
+import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.write.record.TSRecord;
import org.apache.tsfile.write.record.datapoint.DataPoint;
@@ -96,6 +101,8 @@ import java.util.List;
import static
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertRowNode;
import static
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertTabletNode;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class DataRegionTest {
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
@@ -266,7 +273,7 @@ public class DataRegionTest {
null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
}
@@ -314,7 +321,7 @@ public class DataRegionTest {
Assert.assertEquals(1, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
}
@@ -362,7 +369,7 @@ public class DataRegionTest {
Assert.assertEquals(1, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
}
@@ -451,7 +458,7 @@ public class DataRegionTest {
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(1, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
}
@@ -518,7 +525,7 @@ public class DataRegionTest {
times.length);
dataRegion.insertTablet(insertTabletNode2);
- Assert.assertTrue(SystemInfo.getInstance().getTotalMemTableSize() > 0);
+ assertTrue(SystemInfo.getInstance().getTotalMemTableSize() > 0);
dataRegion.syncDeleteDataFiles();
Assert.assertEquals(0, SystemInfo.getInstance().getTotalMemTableSize());
@@ -603,7 +610,7 @@ public class DataRegionTest {
Assert.assertEquals(0, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
}
@@ -679,7 +686,7 @@ public class DataRegionTest {
Assert.assertEquals(0, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
}
@@ -706,10 +713,10 @@ public class DataRegionTest {
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
Assert.assertEquals(10, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
for (TsFileResource resource : queryDataSource.getUnseqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
}
@@ -740,10 +747,10 @@ public class DataRegionTest {
Assert.assertEquals(0, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
for (TsFileResource resource : queryDataSource.getUnseqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
}
@@ -773,10 +780,10 @@ public class DataRegionTest {
Assert.assertEquals(0, queryDataSource.getSeqResources().size());
Assert.assertEquals(20, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
for (TsFileResource resource : queryDataSource.getUnseqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
config.setEnableSeparateData(defaultValue);
@@ -855,7 +862,7 @@ public class DataRegionTest {
Assert.assertEquals(0, queryDataSource.getSeqResources().size());
Assert.assertEquals(2, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
config.setEnableSeparateData(defaultEnableDiscard);
@@ -935,7 +942,7 @@ public class DataRegionTest {
Assert.assertEquals(0, queryDataSource.getSeqResources().size());
Assert.assertEquals(2, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
config.setEnableSeparateData(defaultEnableDiscard);
@@ -1015,7 +1022,7 @@ public class DataRegionTest {
Assert.assertEquals(0, queryDataSource.getSeqResources().size());
Assert.assertEquals(2, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
config.setEnableSeparateData(defaultEnableDiscard);
@@ -1055,7 +1062,7 @@ public class DataRegionTest {
Assert.assertEquals(1, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
dataRegion1.syncDeleteDataFiles();
}
@@ -1092,10 +1099,10 @@ public class DataRegionTest {
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
for (TsFileResource resource : queryDataSource.getUnseqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
dataRegion1.syncDeleteDataFiles();
@@ -1160,10 +1167,10 @@ public class DataRegionTest {
Collections.singletonList(nonAlignedFullPath), device, context,
null, null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
for (TsFileResource resource : queryDataSource.getUnseqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
IoTDBDescriptor.getInstance()
.getConfig()
@@ -1232,7 +1239,7 @@ public class DataRegionTest {
+ CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX);
Assert.assertFalse(logFile.exists());
Assert.assertFalse(CommonDescriptor.getInstance().getConfig().isReadOnly());
- Assert.assertTrue(dataRegion.getTsFileManager().isAllowCompaction());
+ assertTrue(dataRegion.getTsFileManager().isAllowCompaction());
} finally {
new CompactionConfigRestorer().restoreCompactionConfig();
}
@@ -1392,10 +1399,10 @@ public class DataRegionTest {
for (int i = 0; i < dataRegion.getSequenceFileList().size(); i++) {
TsFileResource resource = dataRegion.getSequenceFileList().get(i);
if (i == 1) {
- Assert.assertTrue(resource.anyModFileExists());
+ assertTrue(resource.anyModFileExists());
Assert.assertEquals(2, resource.getAllModEntries().size());
} else if (i == 3) {
- Assert.assertTrue(resource.anyModFileExists());
+ assertTrue(resource.anyModFileExists());
Assert.assertEquals(1, resource.getAllModEntries().size());
} else {
Assert.assertFalse(resource.anyModFileExists());
@@ -1489,7 +1496,7 @@ public class DataRegionTest {
dataRegion.deleteByDevice(new MeasurementPath("root.vehicle.d0.s0"),
deleteDataNode4);
dataRegion.syncCloseAllWorkingTsFileProcessors();
- Assert.assertTrue(tsFileResource.anyModFileExists());
+ assertTrue(tsFileResource.anyModFileExists());
Assert.assertEquals(3, tsFileResource.getAllModEntries().size());
}
@@ -1584,7 +1591,7 @@ public class DataRegionTest {
dataRegion.deleteByDevice(new MeasurementPath("root.vehicle.d0.s0"),
deleteDataNode12);
dataRegion.syncCloseAllWorkingTsFileProcessors();
- Assert.assertTrue(tsFileResource.anyModFileExists());
+ assertTrue(tsFileResource.anyModFileExists());
Assert.assertEquals(3, tsFileResource.getAllModEntries().size());
}
@@ -1686,7 +1693,7 @@ public class DataRegionTest {
new DeleteDataNode(new PlanNodeId("1"),
Collections.singletonList(path), 50, 100);
deleteDataNode1.setSearchIndex(0);
dataRegion.deleteDataDirectly(new MeasurementPath("root.vehicle.d0.**"),
deleteDataNode1);
- Assert.assertTrue(tsFileResource.getTsFile().exists());
+ assertTrue(tsFileResource.getTsFile().exists());
Assert.assertFalse(tsFileResource.anyModFileExists());
dataRegion.syncCloseAllWorkingTsFileProcessors();
@@ -1696,8 +1703,8 @@ public class DataRegionTest {
new DeleteDataNode(new PlanNodeId("2"),
Collections.singletonList(path), 100, 120);
deleteDataNode2.setSearchIndex(0);
dataRegion.deleteDataDirectly(new MeasurementPath("root.vehicle.d0.**"),
deleteDataNode2);
- Assert.assertTrue(tsFileResource.getTsFile().exists());
- Assert.assertTrue(tsFileResource.anyModFileExists());
+ assertTrue(tsFileResource.getTsFile().exists());
+ assertTrue(tsFileResource.anyModFileExists());
// delete data in closed file, and time all match
DeleteDataNode deleteDataNode3 =
@@ -1727,8 +1734,8 @@ public class DataRegionTest {
dataRegion.syncCloseWorkingTsFileProcessors(true);
TsFileResource tsFileResourceUnSeq =
dataRegion.getTsFileManager().getTsFileList(false).get(0);
- Assert.assertTrue(tsFileResourceSeq.getTsFile().exists());
- Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists());
+ assertTrue(tsFileResourceSeq.getTsFile().exists());
+ assertTrue(tsFileResourceUnSeq.getTsFile().exists());
// already closed, will have a mods file.
MeasurementPath path = new MeasurementPath("root.vehicle.d0.**");
@@ -1743,9 +1750,9 @@ public class DataRegionTest {
dataRegion.deleteDataDirectly(new MeasurementPath("root.vehicle.d0.**"),
deleteDataNode2);
// delete data in mem table, there is no mods
- Assert.assertTrue(tsFileResourceSeq.getTsFile().exists());
- Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists());
- Assert.assertTrue(tsFileResourceSeq.anyModFileExists());
+ assertTrue(tsFileResourceSeq.getTsFile().exists());
+ assertTrue(tsFileResourceUnSeq.getTsFile().exists());
+ assertTrue(tsFileResourceSeq.anyModFileExists());
Assert.assertFalse(tsFileResourceUnSeq.anyModFileExists());
dataRegion.syncCloseAllWorkingTsFileProcessors();
@@ -1753,8 +1760,8 @@ public class DataRegionTest {
new DeleteDataNode(new PlanNodeId("3"),
Collections.singletonList(path), 40, 80);
deleteDataNode3.setSearchIndex(0);
dataRegion.deleteDataDirectly(new MeasurementPath("root.vehicle.d0.**"),
deleteDataNode3);
- Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists());
- Assert.assertTrue(tsFileResourceUnSeq.anyModFileExists());
+ assertTrue(tsFileResourceUnSeq.getTsFile().exists());
+ assertTrue(tsFileResourceUnSeq.anyModFileExists());
// seq file and unseq file have data file and mod file now,
// this deletion will remove data file and mod file.
@@ -1772,4 +1779,61 @@ public class DataRegionTest {
Assert.assertFalse(tsFileResourceSeq.anyModFileExists());
Assert.assertFalse(tsFileResourceUnSeq.anyModFileExists());
}
+
+ @Test
+ public void testSchemaEvolution()
+ throws IllegalPathException, WriteProcessException,
QueryProcessException {
+ String[] measurements = {"tag1", "s1", "s2"};
+ MeasurementSchema[] measurementSchemas = {
+ new MeasurementSchema("tag1", TSDataType.STRING),
+ new MeasurementSchema("s1", TSDataType.INT64),
+ new MeasurementSchema("s2", TSDataType.DOUBLE)
+ };
+ RelationalInsertRowNode insertRowNode = new RelationalInsertRowNode(new
PlanNodeId(""),
+ new PartialPath(new String[] {"table1"}),
+ true,
+ measurements,
+ new TSDataType[]{TSDataType.STRING, TSDataType.INT64,
TSDataType.DOUBLE},
+ measurementSchemas,
+ 10,
+ new Object[]{new Binary("tag1".getBytes(StandardCharsets.UTF_8)), 1L,
1.0},
+ false,
+ new TsTableColumnCategory[]{TsTableColumnCategory.TAG,
TsTableColumnCategory.FIELD, TsTableColumnCategory.FIELD});
+ dataRegion.insert(insertRowNode);
+
+ // table1 -> table2
+ dataRegion.applySchemaEvolution(Collections.singletonList(new
TableRename("table1", "table2")));
+
+ // cannot query with the old name
+ IDeviceID deviceID1 = Factory.DEFAULT_FACTORY.create(new
String[]{"table1", "tag1"});
+ List<IFullPath> fullPaths = Arrays.asList(
+ new AlignedFullPath(deviceID1, Arrays.asList(measurements),
Arrays.asList(measurementSchemas))
+ );
+ QueryDataSource dataSource = dataRegion.query(fullPaths, deviceID1, new
QueryContext(), null,
+ Collections.singletonList(0L), Long.MAX_VALUE);
+ assertTrue(dataSource.getSeqResources().isEmpty());
+
+ // can query with the new name
+ IDeviceID deviceID2 = Factory.DEFAULT_FACTORY.create(new
String[]{"table2", "tag1"});
+ fullPaths = Arrays.asList(
+ new AlignedFullPath(deviceID2, Arrays.asList(measurements),
Arrays.asList(measurementSchemas))
+ );
+ dataSource = dataRegion.query(fullPaths, deviceID2, new QueryContext(),
null,
+ Collections.singletonList(0L), Long.MAX_VALUE);
+ assertEquals(1, dataSource.getSeqResources().size());
+
+ // write again with table1
+ insertRowNode = new RelationalInsertRowNode(new PlanNodeId(""),
+ new PartialPath(new String[] {"table1"}),
+ true,
+ measurements,
+ new TSDataType[]{TSDataType.STRING, TSDataType.INT64,
TSDataType.DOUBLE},
+ measurementSchemas,
+ 10,
+ new Object[]{new Binary("tag1".getBytes(StandardCharsets.UTF_8)), 1L,
1.0},
+ false,
+ new TsTableColumnCategory[]{TsTableColumnCategory.TAG,
TsTableColumnCategory.FIELD, TsTableColumnCategory.FIELD});
+ dataRegion.insert(insertRowNode);
+
+ }
}