This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch support_schema_evolution
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/support_schema_evolution by
this push:
new 9568d7c2 temp save
9568d7c2 is described below
commit 9568d7c29b44f209ccaf36a8d814d07807c9a238
Author: Tian Jiang <[email protected]>
AuthorDate: Thu Nov 13 12:19:45 2025 +0800
temp save
---
.../apache/tsfile/file/metadata/TableSchema.java | 33 +++++++-
.../tsfile/file/metadata/TsFileMetadata.java | 13 +++
.../file/metadata/evolution/EvolvedSchema.java | 50 ++++++++++-
.../metadata/evolution/TsFileBackupProcessor.java | 97 ++++++++++++++++++++++
.../metadata/evolution/TsFileSchemaRewriter.java | 65 ++++++++++++---
.../apache/tsfile/read/TsFileSequenceReader.java | 64 ++++++++++++--
.../read/controller/MetadataQuerierByFileImpl.java | 2 +-
.../read/query/executor/TableQueryExecutor.java | 9 +-
.../tsfile/read/v4/DeviceTableModelReader.java | 7 +-
.../org/apache/tsfile/utils/TsFileSketchTool.java | 2 +-
.../tsfile/write/schema/MeasurementSchema.java | 10 +++
.../apache/tsfile/write/writer/TsFileIOWriter.java | 2 +-
.../evolution/TsFileBackupProcessorTest.java | 66 +++++++++++++++
.../evolution/TsFileSchemaEvolutionTest.java | 70 ++++++++++++++++
.../apache/tsfile/write/TsFileWriteApiTest.java | 2 +-
15 files changed, 459 insertions(+), 33 deletions(-)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
index 3f77f3fa..6e2d2e68 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
@@ -51,12 +51,10 @@ public class TableSchema {
// columnName -> pos in columnSchemas
private Map<String, Integer> columnPosIndex;
- // columnName -> pos in all id columns
+ // columnName -> pos in all tag columns
private Map<String, Integer> tagColumnOrder;
private int tagColumnCnt = -1;
- private boolean deleted = false;
-
public TableSchema(String tableName) {
this.tableName = tableName.toLowerCase();
this.measurementSchemas = new ArrayList<>();
@@ -64,6 +62,14 @@ public class TableSchema {
this.updatable = true;
}
+ @SuppressWarnings("CopyConstructorMissesField")
+ public TableSchema(TableSchema another) {
+ this.tableName = another.tableName;
+ this.measurementSchemas = new ArrayList<>(another.measurementSchemas);
+ this.columnCategories = new ArrayList<>(another.columnCategories);
+ this.updatable = another.updatable;
+ }
+
// for deserialize
public TableSchema(
List<IMeasurementSchema> columnSchemas, List<ColumnCategory>
columnCategories) {
@@ -342,4 +348,25 @@ public class TableSchema {
tagColumnCnt = (int) columnCategories.stream().filter(c -> c ==
ColumnCategory.TAG).count();
return tagColumnCnt;
}
+
+ public void renameColumn(String oldName, String newName) {
+ int columnIndex = findColumnIndex(oldName);
+ if (columnIndex >= 0) {
+ MeasurementSchema measurementSchema =
measurementSchemas.get(columnIndex);
+ measurementSchema.setMeasurementName(newName);
+ measurementSchema.setOriginalMeasurementName(oldName);
+ // update the columnPosIndex map
+ if (columnPosIndex != null) {
+ columnPosIndex.remove(oldName);
+ columnPosIndex.put(newName, columnIndex);
+ }
+
+ if (tagColumnOrder != null) {
+ Integer order = tagColumnOrder.remove(oldName);
+ if (order != null) {
+ tagColumnOrder.put(newName, order);
+ }
+ }
+ }
+ }
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TsFileMetadata.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TsFileMetadata.java
index 21f89194..040ec83b 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TsFileMetadata.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TsFileMetadata.java
@@ -47,6 +47,7 @@ public class TsFileMetadata {
// List of <name, offset, childMetadataIndexType>
private Map<String, MetadataIndexNode> tableMetadataIndexNodeMap;
private Map<String, TableSchema> tableSchemaMap;
+ private Map<String, TableSchema> evolvedTableSchemaMap;
private boolean hasTableSchemaMapCache;
private Map<String, String> tsFileProperties;
@@ -305,6 +306,18 @@ public class TsFileMetadata {
return tableSchemaMap;
}
+ public Map<String, TableSchema> getEvolvedTableSchemaMap() {
+ if (evolvedTableSchemaMap != null) {
+ return evolvedTableSchemaMap;
+ }
+ if (evolvedSchema == null) {
+ evolvedTableSchemaMap = tableSchemaMap;
+ } else if (tableSchemaMap != null) {
+ evolvedTableSchemaMap =
evolvedSchema.getEvolvedTableSchemaMap(tableSchemaMap);
+ }
+ return evolvedTableSchemaMap;
+ }
+
public Map<String, String> getTsFileProperties() {
return tsFileProperties;
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/EvolvedSchema.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/EvolvedSchema.java
index 2373e74a..4eece2bb 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/EvolvedSchema.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/EvolvedSchema.java
@@ -20,7 +20,12 @@
package org.apache.tsfile.file.metadata.evolution;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.tsfile.file.metadata.TableSchema;
public class EvolvedSchema {
// the evolved table names after applying all schema evolution operations
@@ -34,8 +39,11 @@ public class EvolvedSchema {
public void renameTable(String oldTableName, String newTableName) {
if (!originalTableNames.containsKey(oldTableName)) {
originalTableNames.put(newTableName, oldTableName);
+ // mark the old table name as non-exists
+ originalTableNames.put(oldTableName, "");
} else {
- String originalName = originalTableNames.remove(oldTableName);
+ // mark the old table name as non-exists
+ String originalName = originalTableNames.put(oldTableName, "");
originalTableNames.put(newTableName, originalName);
}
@@ -47,11 +55,13 @@ public class EvolvedSchema {
public void renameColumn(String tableName, String oldColumnName, String
newColumnName) {
Map<String, String> columnNameMap =
originalColumnNames.computeIfAbsent(tableName,
- t -> new HashMap<>());
+ t -> new LinkedHashMap<>());
if (!columnNameMap.containsKey(oldColumnName)) {
columnNameMap.put(newColumnName, oldColumnName);
+ // mark the old column name as non-exists
+ columnNameMap.put(oldColumnName, "");
} else {
- String originalName = columnNameMap.remove(oldColumnName);
+ String originalName = columnNameMap.put(oldColumnName, "");
columnNameMap.put(newColumnName, originalName);
}
}
@@ -67,4 +77,38 @@ public class EvolvedSchema {
}
return columnNameMap.getOrDefault(evolvedColumnName, evolvedColumnName);
}
+
+ public Map<String, TableSchema> getEvolvedTableSchemaMap(Map<String,
TableSchema> tableSchemaMap) {
+ Map<String, TableSchema> evolvedTableSchemaMap = new
HashMap<>(tableSchemaMap.size());
+ Set<String> renamedTables = new HashSet<>(originalTableNames.values());
+ // add renamed tables
+ for (Entry<String, String> finalAndOriginalTableName :
originalTableNames.entrySet()) {
+ String finalName = finalAndOriginalTableName.getKey();
+ String originalName = finalAndOriginalTableName.getValue();
+
+ TableSchema tableSchema = tableSchemaMap.get(originalName);
+ if (tableSchema != null) {
+ tableSchema = new TableSchema(tableSchema);
+ tableSchema.setTableName(finalName);
+ evolvedTableSchemaMap.put(finalName, tableSchema);
+ }
+ }
+ // add non-renamed tables
+ tableSchemaMap.entrySet().stream().filter(e ->
!renamedTables.contains(e.getKey())).forEach(e ->
evolvedTableSchemaMap.put(e.getKey(), new TableSchema(e.getValue())));
+
+ // rename columns
+ for (Entry<String, Map<String, String>> tableColumnNameMap :
originalColumnNames.entrySet()) {
+ String tableName = tableColumnNameMap.getKey();
+ Map<String, String> columnFinalAndOriginalNames =
tableColumnNameMap.getValue();
+ TableSchema tableSchema = evolvedTableSchemaMap.get(tableName);
+
+ for (Entry<String, String> columnFinalAndOriginalName :
columnFinalAndOriginalNames.entrySet()) {
+ String columnFinalName = columnFinalAndOriginalName.getKey();
+ String columnOriginalName = columnFinalAndOriginalName.getValue();
+ tableSchema.renameColumn(columnOriginalName, columnFinalName);
+ }
+ }
+
+ return evolvedTableSchemaMap;
+ }
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/TsFileBackupProcessor.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/TsFileBackupProcessor.java
new file mode 100644
index 00000000..66f70695
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/TsFileBackupProcessor.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tsfile.file.metadata.evolution;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * TsFileBackupWriter is responsible for writing a backup of a suffix of a
TsFile and recover a
+ * TsFile from its backup file.
+ */
+@SuppressWarnings("ResultOfMethodCallIgnored")
+public class TsFileBackupProcessor {
+
+ public static final String BACKUP_FILE_SUFFIX = ".backup";
+
+ public static void writeBackup(File tsFile, long backupPosition) throws
IOException {
+ // Implementation for writing backup files goes here.
+ File backupFile = new File(tsFile.getAbsolutePath() + BACKUP_FILE_SUFFIX);
+ long backupLength = tsFile.length() - backupPosition;
+ try (FileChannel backupChannel = new
FileOutputStream(backupFile).getChannel();
+ FileChannel originalChannel = new
FileInputStream(tsFile).getChannel()) {
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.putLong(backupPosition);
+ buffer.flip();
+ backupChannel.write(buffer);
+ buffer.flip();
+ buffer.putLong(backupLength);
+ buffer.flip();
+ backupChannel.write(buffer);
+
+ originalChannel.transferTo(backupPosition, backupLength, backupChannel);
+ }
+ }
+
+ public static boolean hasBackup(File tsFile) {
+ File backupFile = new File(tsFile.getAbsolutePath() + BACKUP_FILE_SUFFIX);
+ return backupFile.exists();
+ }
+
+ public static void removeBackup(File tsFile) {
+ File backupFile = new File(tsFile.getAbsolutePath() + BACKUP_FILE_SUFFIX);
+ backupFile.delete();
+ }
+
+ public static void recoverFromBackup(File tsFile) throws IOException {
+ // Implementation for recovering from backup files goes here.
+ File backupFile = new File(tsFile.getAbsolutePath() + BACKUP_FILE_SUFFIX);
+ if (!backupFile.exists()) {
+ return;
+ }
+
+ try (FileChannel backupChannel = new
FileInputStream(backupFile).getChannel();
+ FileChannel originalChannel = new FileOutputStream(tsFile,
true).getChannel()) {
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * 2);
+ int read = backupChannel.read(buffer);
+ if (read != Long.BYTES * 2) {
+ // backup file is not complete, no need to recover
+ return;
+ }
+ buffer.flip();
+ long backupPosition = buffer.getLong();
+ long backupLength = buffer.getLong();
+
+ if (backupFile.length() < Long.BYTES * 2 + backupLength) {
+ // backup file is not complete, no need to recover
+ return;
+ }
+
+ originalChannel.position(backupPosition);
+ originalChannel.truncate(backupPosition);
+ backupChannel.transferTo(Long.BYTES * 2, backupLength, originalChannel);
+ } finally {
+ backupFile.delete();
+ }
+ }
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/TsFileSchemaRewriter.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/TsFileSchemaRewriter.java
index d12cfb27..fb5be1fa 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/TsFileSchemaRewriter.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/TsFileSchemaRewriter.java
@@ -20,15 +20,24 @@
package org.apache.tsfile.file.metadata.evolution;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.ColumnCategory;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.ColumnSchema;
+import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.file.metadata.TsFileMetadata;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.TsFileWriter;
import org.apache.tsfile.write.record.TSRecord;
+import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
import java.io.DataOutputStream;
@@ -37,12 +46,15 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Collections;
import java.util.Map;
+import org.apache.tsfile.write.v4.ITsFileWriter;
+import org.apache.tsfile.write.v4.TsFileWriterBuilder;
/**
* A utility class to rewrite the schema of an existing TsFile by appending
new properties to its
* TsFileMetadata.
*/
public class TsFileSchemaRewriter {
+
private final String filePath;
public TsFileSchemaRewriter(String tsfilePath) {
@@ -76,6 +88,9 @@ public class TsFileSchemaRewriter {
// calculate the new metadata size
int newMetadataSize = propertiesOffset + newPropertiesSize;
+ File file = new File(filePath);
+ TsFileBackupProcessor.writeBackup(file, metadataOffset + propertiesOffset);
+
try (RandomAccessFile randomAccessFile = new RandomAccessFile(filePath,
"rw")) {
// write the new properties and update the metadata size
randomAccessFile.seek(metadataOffset + propertiesOffset);
@@ -83,23 +98,51 @@ public class TsFileSchemaRewriter {
randomAccessFile.writeInt(newMetadataSize);
randomAccessFile.write(TSFileConfig.MAGIC_STRING.getBytes(StandardCharsets.UTF_8));
}
+
+ TsFileBackupProcessor.removeBackup(file);
}
public static void main(String[] args) throws IOException,
WriteProcessException {
- String tsfilePath = "test.tsfile";
- try (TsFileWriter writer = new TsFileWriter(new File(tsfilePath))) {
- writer.registerTimeseries("d1", new MeasurementSchema("s1",
TSDataType.INT32));
- TSRecord record = new TSRecord("d1", 0);
- record.addPoint("s1", 100);
- writer.writeRecord(record);
+ int fileNum = 10000;
+ int evolutionNum = 100;
+ List<String> files = new ArrayList<>();
+ TableSchema tableSchema = new TableSchema(
+ "test_table",
+ Arrays.asList(
+ new ColumnSchema("s1", TSDataType.INT64, ColumnCategory.FIELD)
+ )
+ );
+ for (int i = 0; i < fileNum; i++) {
+ String tsfilePath = "test " + i + ".tsfile";
+ files.add(tsfilePath);
+ try (ITsFileWriter tsFileWriter = new TsFileWriterBuilder().file(new
File(tsfilePath))
+ .tableSchema(tableSchema).build()) {
+ Tablet tablet = new Tablet(tableSchema);
+ tablet.addTimestamp(0, 1L);
+ tablet.addValue("s1", 0, 100L);
+ tsFileWriter.write(tablet);
+ }
}
- TsFileSchemaRewriter rewriter = new TsFileSchemaRewriter(tsfilePath);
- rewriter.appendProperties(Collections.singletonMap("new_property_key",
"new_property_value"));
+ long start = System.currentTimeMillis();
+ for (String file : files) {
+ TsFileSchemaRewriter rewriter = new TsFileSchemaRewriter(file);
+ Map<String, String> newProperties = new LinkedHashMap<>();
+ for (int i = 0; i < evolutionNum; i++) {
+ SchemaEvolution evolution = new ColumnRename("t1", "s" + i, "s" + (i +
1));
+ newProperties.put(
+ evolution.propertyKey(),
+ evolution.propertyValue()
+ );
+ }
+ rewriter.appendProperties(newProperties);
+ }
+ System.out.println(
+ "Time taken to rewrite " + fileNum + " files: " +
(System.currentTimeMillis() - start)
+ + " ms");
- try (TsFileSequenceReader reader = new TsFileSequenceReader(tsfilePath)) {
- TsFileMetadata tsFileMetadata = reader.readFileMetadata();
- System.out.println("Updated TsFile Properties: " +
tsFileMetadata.getTsFileProperties());
+ for (String file : files) {
+ new File(file).delete();
}
}
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
index 5cb79954..1a21f624 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
@@ -19,6 +19,7 @@
package org.apache.tsfile.read;
+import java.util.Optional;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.common.constant.TsFileConstant;
@@ -61,6 +62,7 @@ import org.apache.tsfile.file.metadata.enums.EncryptionType;
import org.apache.tsfile.file.metadata.enums.MetadataIndexNodeType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.file.metadata.evolution.EvolvedSchema;
+import org.apache.tsfile.file.metadata.evolution.TsFileBackupProcessor;
import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.fileSystem.FSFactoryProducer;
import org.apache.tsfile.read.common.BatchData;
@@ -215,6 +217,10 @@ public class TsFileSequenceReader implements AutoCloseable
{
resourceLogger.debug("{} reader is opened. {}", file,
getClass().getName());
}
this.file = file;
+ if (TsFileBackupProcessor.hasBackup(new File(file))) {
+ TsFileBackupProcessor.recoverFromBackup(new File(file));
+ }
+
tsFileInput = FSFactoryProducer.getFileInputFactory().getTsFileInput(file);
try {
@@ -484,10 +490,37 @@ public class TsFileSequenceReader implements
AutoCloseable {
return tsFileMetaData;
}
+ public Map<String, TableSchema> getEvolvedTableSchemaMap() throws
IOException {
+ return getEvolvedTableSchemaMap(null);
+ }
+
public Map<String, TableSchema> getTableSchemaMap() throws IOException {
return getTableSchemaMap(null);
}
+ public Optional<TableSchema> getTableSchema(String tableName) throws
IOException {
+ tableName = convertToOriginalTableName(tableName.toLowerCase());
+ return Optional.ofNullable(getTableSchemaMap(null).get(tableName));
+ }
+
+ public String convertToOriginalTableName(String tableName) throws
IOException {
+ TsFileMetadata tsFileMetadata = readFileMetadata();
+ EvolvedSchema evolvedSchema = tsFileMetadata.getEvolvedSchema(false);
+ if (evolvedSchema == null) {
+ return tableName;
+ }
+ return evolvedSchema.getOriginalTableName(tableName);
+ }
+
+ public String convertToOriginalColumnName(String tableName, String
columnName) throws IOException {
+ TsFileMetadata tsFileMetadata = readFileMetadata();
+ EvolvedSchema evolvedSchema = tsFileMetadata.getEvolvedSchema(false);
+ if (evolvedSchema == null) {
+ return columnName;
+ }
+ return evolvedSchema.getOriginalColumnName(tableName, columnName);
+ }
+
public Map<String, TableSchema> getTableSchemaMap(LongConsumer
ioSizeRecorder)
throws IOException {
if (tsFileMetaData != null && tsFileMetaData.hasTableSchemaMapCache()) {
@@ -502,6 +535,20 @@ public class TsFileSequenceReader implements AutoCloseable
{
return tempTsFileMetadata.getTableSchemaMap();
}
+ public Map<String, TableSchema> getEvolvedTableSchemaMap(LongConsumer
ioSizeRecorder)
+ throws IOException {
+ if (tsFileMetaData != null && tsFileMetaData.getEvolvedTableSchemaMap() !=
null) {
+ return tsFileMetaData.getEvolvedTableSchemaMap();
+ }
+ TsFileMetadata tempTsFileMetadata = forceReadFileMetadata(true,
ioSizeRecorder);
+ if (cacheTableSchemaMap) {
+ synchronized (this) {
+ this.tsFileMetaData = tempTsFileMetadata;
+ }
+ }
+ return tempTsFileMetadata.getEvolvedTableSchemaMap();
+ }
+
private TsFileMetadata forceReadFileMetadata(
boolean needTableSchemaMap, LongConsumer ioSizeRecorder) throws
IOException {
ByteBuffer buffer = readData(fileMetadataPos, fileMetadataSize,
ioSizeRecorder);
@@ -641,14 +688,21 @@ public class TsFileSequenceReader implements
AutoCloseable {
MetadataIndexNode deviceMetadataIndexNode =
tsFileMetaData.getTableMetadataIndexNode(device.getTableName());
IDeviceID deviceInIndex = device;
+ String measurementInIndex = measurement;
EvolvedSchema evolvedSchema = tsFileMetaData.getEvolvedSchema(false);
if (evolvedSchema != null) {
- evolvedSchema.getOriginalTableName(device.getTableName());
- if (!tableSchema.getTableName().equals(device.getTableName())) {
+ String originalTableName =
evolvedSchema.getOriginalTableName(device.getTableName());
+ if (!originalTableName.equals(device.getTableName())) {
// the table has been renamed, use the original table name to get
deviceMetadataIndexNode
deviceInIndex = device.clone();
- deviceInIndex.setTableName(tableSchema.getTableName());
+ deviceInIndex.setTableName(originalTableName);
+ }
+ String originalMeasurement =
+ evolvedSchema.getOriginalColumnName(
+ deviceInIndex.getTableName(), measurement);
+ if (!originalMeasurement.equals(measurement)) {
+ measurementInIndex = originalMeasurement;
}
}
@@ -674,7 +728,7 @@ public class TsFileSequenceReader implements AutoCloseable {
}
metadataIndexPair =
getMetadataAndEndOffsetOfMeasurementNode(
- metadataIndexNode, measurement, false, ioSizeConsumer);
+ metadataIndexNode, measurementInIndex, false, ioSizeConsumer);
}
if (metadataIndexPair == null) {
return null;
@@ -711,7 +765,7 @@ public class TsFileSequenceReader implements AutoCloseable {
}
// return null if path does not exist in the TsFile
- int searchResult =
binarySearchInTimeseriesMetadataList(timeseriesMetadataList, measurement);
+ int searchResult =
binarySearchInTimeseriesMetadataList(timeseriesMetadataList,
measurementInIndex);
return searchResult >= 0 ? timeseriesMetadataList.get(searchResult) : null;
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
index 68e19530..41bd6b59 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
@@ -68,7 +68,7 @@ public class MetadataQuerierByFileImpl implements
IMetadataQuerier {
this.tsFileReader = tsFileReader;
this.tsFileReader.setEnableCacheTableSchemaMap();
this.fileMetaData = tsFileReader.readFileMetadata();
- this.tableSchemaMap = tsFileReader.getTableSchemaMap();
+ this.tableSchemaMap = tsFileReader.getEvolvedTableSchemaMap();
deviceIdChunkMetadataCache =
new LRUCache<Pair<IDeviceID, String>,
List<IChunkMetadata>>(CACHED_ENTRY_NUMBER) {
@Override
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TableQueryExecutor.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TableQueryExecutor.java
index c50d1bf1..2eeb4d08 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TableQueryExecutor.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TableQueryExecutor.java
@@ -42,6 +42,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
public class TableQueryExecutor {
@@ -124,11 +126,12 @@ public class TableQueryExecutor {
}
final ColumnCategory columnCategory =
schema.getColumnTypes().get(columnIndex);
- columnPosMap.computeIfAbsent(columnName, k -> new ArrayList<>()).add(i);
+ MeasurementSchema measurementSchema = (MeasurementSchema)
schema.getColumnSchemas().get(columnIndex);
+
columnPosMap.computeIfAbsent(measurementSchema.getOriginalMeasurementName(), k
-> new ArrayList<>()).add(i);
if (columnCategory.equals(ColumnCategory.TAG)) {
- idColumns.add(columnName);
+ idColumns.add(measurementSchema.getOriginalMeasurementName());
} else {
- measurementColumns.add(columnName);
+ measurementColumns.add(measurementSchema.getOriginalMeasurementName());
}
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java
index 25a89316..6ff1ea66 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java
@@ -68,14 +68,13 @@ public class DeviceTableModelReader implements
ITsFileReader {
@TsFileApi
public List<TableSchema> getAllTableSchema() throws IOException {
- Map<String, TableSchema> tableSchemaMap = fileReader.getTableSchemaMap();
+ Map<String, TableSchema> tableSchemaMap =
fileReader.getEvolvedTableSchemaMap();
return new ArrayList<>(tableSchemaMap.values());
}
@TsFileApi
public Optional<TableSchema> getTableSchemas(String tableName) throws
IOException {
- Map<String, TableSchema> tableSchemaMap = fileReader.getTableSchemaMap();
- return Optional.ofNullable(tableSchemaMap.get(tableName.toLowerCase()));
+ return fileReader.getTableSchema(tableName);
}
@TsFileApi
@@ -89,7 +88,7 @@ public class DeviceTableModelReader implements ITsFileReader {
String tableName, List<String> columnNames, long startTime, long
endTime, Filter tagFilter)
throws ReadProcessException, IOException, NoTableException,
NoMeasurementException {
String lowerCaseTableName = tableName.toLowerCase();
- TableSchema tableSchema =
fileReader.getTableSchemaMap().get(lowerCaseTableName);
+ TableSchema tableSchema =
fileReader.getEvolvedTableSchemaMap().get(lowerCaseTableName);
if (tableSchema == null) {
throw new NoTableException(tableName);
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/utils/TsFileSketchTool.java
b/java/tsfile/src/main/java/org/apache/tsfile/utils/TsFileSketchTool.java
index 2bf2fd5e..257e5c62 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/utils/TsFileSketchTool.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/utils/TsFileSketchTool.java
@@ -202,7 +202,7 @@ public class TsFileSketchTool {
}
// table schema
- Map<String, TableSchema> tableSchemaMap = reader.getTableSchemaMap();
+ Map<String, TableSchema> tableSchemaMap =
reader.getEvolvedTableSchemaMap();
printlnBoth(pw, String.format("%20s", pos) + "|\tTableSchemaCnt=" +
tableSchemaMap.size());
pos += Integer.BYTES;
for (Entry<String, TableSchema> entry : tableSchemaMap.entrySet()) {
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java
index c70c84cc..9b883ae8 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java
@@ -53,6 +53,8 @@ public class MeasurementSchema
+ RamUsageEstimator.shallowSizeOfInstance(TSEncodingBuilder.class);
private String measurementName;
+ // if the measurement has been renamed, originalMeasurementName records the
old name
+ private String originalMeasurementName;
private TSDataType dataType;
private TSEncoding encoding;
private CompressionType compressionType;
@@ -480,4 +482,12 @@ public class MeasurementSchema
public void setDeleted(boolean deleted) {
this.deleted = deleted;
}
+
+ public String getOriginalMeasurementName() {
+ return originalMeasurementName != null ? originalMeasurementName :
measurementName;
+ }
+
+ public void setOriginalMeasurementName(String originalMeasurementName) {
+ this.originalMeasurementName = originalMeasurementName;
+ }
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
index 4019d235..667aadd1 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
@@ -546,7 +546,7 @@ public class TsFileIOWriter implements AutoCloseable {
TsFileMetadata tsFileMetadata = new TsFileMetadata();
tsFileMetadata.setTableMetadataIndexNodeMap(tableNodesMap);
- tsFileMetadata.setTableSchemaMap(new
TableSchemaMap(schema.getTableSchemaMap()));
+ tsFileMetadata.setTableSchemaMap(new
HashMap<>(schema.getTableSchemaMap()));
tsFileMetadata.setMetaOffset(metaOffset);
tsFileMetadata.setBloomFilter(filter);
tsFileMetadata.addProperty("encryptLevel", encryptLevel);
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/file/metadata/evolution/TsFileBackupProcessorTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/file/metadata/evolution/TsFileBackupProcessorTest.java
new file mode 100644
index 00000000..4a44051a
--- /dev/null
+++
b/java/tsfile/src/test/java/org/apache/tsfile/file/metadata/evolution/TsFileBackupProcessorTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tsfile.file.metadata.evolution;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.ColumnSchema;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.v4.ITsFileWriter;
+import org.apache.tsfile.write.v4.TsFileWriterBuilder;
+import org.junit.Test;
+
+public class TsFileBackupProcessorTest {
+
+ @Test
+ public void testBackupAndRecover() throws IOException, WriteProcessException
{
+ File file = new File("target" + File.separator + "test.tsfile");
+ TableSchema tableSchema = new TableSchema(
+ "test_table",
+ Arrays.asList(
+ new ColumnSchema("s1", TSDataType.INT64, ColumnCategory.FIELD)
+ )
+ );
+ try (ITsFileWriter tsFileWriter = new
TsFileWriterBuilder().file(file).tableSchema(tableSchema).build()) {
+ Tablet tablet = new Tablet(tableSchema);
+ tablet.addTimestamp(0, 1L);
+ tablet.addValue("s1", 0, 100L);
+ tsFileWriter.write(tablet);
+ }
+
+ long backupPosition = file.length() / 2;
+ TsFileBackupProcessor.writeBackup(file, backupPosition);
+ try (FileChannel fileChannel = new FileOutputStream(file,
true).getChannel()) {
+ fileChannel.truncate(backupPosition);
+ }
+
+ TsFileSequenceReader reader = new TsFileSequenceReader(file.getPath());
+ assertEquals(tableSchema, reader.getTableSchema("test_table").get());
+ }
+}
\ No newline at end of file
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/file/metadata/evolution/TsFileSchemaEvolutionTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/file/metadata/evolution/TsFileSchemaEvolutionTest.java
index 798844dd..90bf4513 100644
---
a/java/tsfile/src/test/java/org/apache/tsfile/file/metadata/evolution/TsFileSchemaEvolutionTest.java
+++
b/java/tsfile/src/test/java/org/apache/tsfile/file/metadata/evolution/TsFileSchemaEvolutionTest.java
@@ -21,6 +21,7 @@ package org.apache.tsfile.file.metadata.evolution;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@@ -41,6 +42,7 @@ import org.apache.tsfile.read.query.dataset.ResultSet;
import org.apache.tsfile.read.v4.ITsFileReader;
import org.apache.tsfile.read.v4.TsFileReaderBuilder;
import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.apache.tsfile.write.v4.DeviceTableModelWriter;
import org.junit.After;
@@ -187,4 +189,72 @@ public class TsFileSchemaEvolutionTest {
assertEquals(3, timeseriesMetadata.getStatistics().getMaxValue());
}
}
+
+ @Test
+ public void testColumnRename()
+ throws IOException, ReadProcessException, NoTableException,
NoMeasurementException {
+ // rename t1.f1 -> t1.f3
+ SchemaEvolution evolution = new ColumnRename("t1", "f1", "f3");
+ TsFileSchemaRewriter rewriter = new TsFileSchemaRewriter(TEST_FILE_PATH);
+
rewriter.appendProperties(Collections.singletonMap(evolution.propertyKey(),
evolution.propertyValue()));
+
+ // Verify the column has been renamed
+ try (ITsFileReader reader = new TsFileReaderBuilder().file(new
File(TEST_FILE_PATH)).build()) {
+ Optional<TableSchema> t = reader.getTableSchemas("t1");
+ assertTrue(t.isPresent());
+ TableSchema tableSchema = t.get();
+ assertNull(tableSchema.findColumnSchema("f1"));
+ IMeasurementSchema schema = tableSchema.findColumnSchema("f3");
+ assertEquals(TSDataType.INT32, schema.getType());
+
+ assertThrows(NoMeasurementException.class, () -> reader.query("t1",
Arrays.asList("f1", "f2"), 0, 10));
+ ResultSet resultSet = reader.query("t1", Arrays.asList("f3", "f2"), 0,
10);
+ assertTrue(resultSet.next());
+ assertEquals(3, resultSet.getInt("f3"));
+ assertEquals(4.0, resultSet.getDouble("f2"), 0.0001);
+ assertFalse(resultSet.next());
+ }
+
+ // rename t1.f2 -> t1.f1
+ evolution = new ColumnRename("t1", "f2", "f1");
+
rewriter.appendProperties(Collections.singletonMap(evolution.propertyKey(),
evolution.propertyValue()));
+
+ // Verify the table has been renamed
+ try (ITsFileReader reader = new TsFileReaderBuilder().file(new
File(TEST_FILE_PATH)).build()) {
+ Optional<TableSchema> t = reader.getTableSchemas("t1");
+ assertTrue(t.isPresent());
+ TableSchema tableSchema = t.get();
+ assertNull(tableSchema.findColumnSchema("f2"));
+ IMeasurementSchema schema = tableSchema.findColumnSchema("f1");
+ assertEquals(TSDataType.DOUBLE, schema.getType());
+
+ assertThrows(NoMeasurementException.class, () -> reader.query("t2",
Arrays.asList("f3", "f2"), 0, 10));
+ ResultSet resultSet = reader.query("t1", Arrays.asList("f3", "f1"), 0,
10);
+ assertTrue(resultSet.next());
+ assertEquals(3, resultSet.getInt("f3"));
+ assertEquals(4.0, resultSet.getDouble("f1"), 0.0001);
+ assertFalse(resultSet.next());
+ }
+
+ // t3 is not affected
+ try (ITsFileReader reader = new TsFileReaderBuilder().file(new
File(TEST_FILE_PATH)).build()) {
+ Optional<TableSchema> t1 = reader.getTableSchemas("t3");
+ assertTrue(t1.isPresent());
+ TableSchema tableSchema = t1.get();
+ assertEquals("t3", tableSchema.getTableName());
+
+ ResultSet resultSet = reader.query("t3", Arrays.asList("f1", "f2"), 0,
10);
+ assertTrue(resultSet.next());
+ assertEquals(23, resultSet.getInt("f1"));
+ assertEquals(24.0, resultSet.getDouble("f2"), 0.0001);
+ assertFalse(resultSet.next());
+ }
+
+ // test read timeseries metadata
+ try (TsFileSequenceReader sequenceReader = new
TsFileSequenceReader(TEST_FILE_PATH)) {
+ TimeseriesMetadata timeseriesMetadata =
sequenceReader.readTimeseriesMetadata(
+ Factory.DEFAULT_FACTORY.create(new String[]{"t1", "t1-tag1",
"t1-tag2"}), "f3", false);
+ assertEquals(3, timeseriesMetadata.getStatistics().getMaxValue());
+ }
+ }
}
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
index 6449b52d..37bf7f2d 100644
--- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
+++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
@@ -1102,7 +1102,7 @@ public class TsFileWriteApiTest {
writer.writeTable(tablet);
}
try (TsFileSequenceReader reader = new TsFileSequenceReader(f.getPath())) {
- Map<String, TableSchema> tableSchemaMap = reader.getTableSchemaMap();
+ Map<String, TableSchema> tableSchemaMap =
reader.getEvolvedTableSchemaMap();
TableSchema tableSchemaInTsFile = tableSchemaMap.get("table1");
Assert.assertNotNull(tableSchemaInTsFile);
for (IMeasurementSchema columnSchema :
tableSchemaInTsFile.getColumnSchemas()) {