This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch support_schema_evolution in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit b8c1c2f7fcf5ed43f500110d264d7f452a0b66fa Author: Tian Jiang <[email protected]> AuthorDate: Thu Oct 30 18:32:35 2025 +0800 add basic supports --- cpp/pom.xml | 14 +++ cpp/src/utils/db_utils.h | 6 +- .../tsfile/file/metadata/LogicalTableSchema.java | 11 +-- .../apache/tsfile/file/metadata/TableSchema.java | 89 ++++++++++++++--- .../tsfile/file/metadata/TableSchemaMap.java | 53 +++++++++++ .../tsfile/file/metadata/TsFileMetadata.java | 20 ++-- .../file/metadata/evolution/ColumnRename.java | 103 ++++++++++++++++++++ .../file/metadata/evolution/SchemaEvolution.java | 77 +++++++++++++++ .../file/metadata/evolution/TableRename.java | 81 ++++++++++++++++ .../metadata/evolution/TsFileSchemaRewriter.java | 105 +++++++++++++++++++++ .../read/filter/factory/TagFilterBuilder.java | 2 +- .../reader/block/SingleDeviceTsBlockReader.java | 2 +- .../org/apache/tsfile/utils/ReadWriteIOUtils.java | 14 +++ .../org/apache/tsfile/write/record/TSRecord.java | 2 +- .../tsfile/write/schema/IMeasurementSchema.java | 2 + .../tsfile/write/schema/MeasurementSchema.java | 26 +++++ .../write/schema/VectorMeasurementSchema.java | 5 + .../apache/tsfile/write/writer/TsFileIOWriter.java | 3 +- .../write/writer/VectorMeasurementSchemaStub.java | 4 + 19 files changed, 588 insertions(+), 31 deletions(-) diff --git a/cpp/pom.xml b/cpp/pom.xml index fa6b0070..be12e32b 100644 --- a/cpp/pom.xml +++ b/cpp/pom.xml @@ -62,6 +62,20 @@ </goals> </execution--> <!-- Generate the configuration for the test compilation --> + <execution> + <id>cmake-generate-compile</id> + <phase>compile</phase> + <goals> + <goal>generate</goal> + </goals> + </execution> + <execution> + <id>cmake-execute-compile</id> + <phase>compile</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> <execution> <id>cmake-generate-test-compile</id> <phase>generate-test-sources</phase> diff --git a/cpp/src/utils/db_utils.h b/cpp/src/utils/db_utils.h index 90d0b969..e42b2909 100644 --- a/cpp/src/utils/db_utils.h +++ b/cpp/src/utils/db_utils.h @@ -23,7 +23,11 @@ #include <stdint.h> #include <stdio.h> #include <string.h> // memcpy -#include <sys/time.h> +#ifdef WIN32 +#include <time.h> +#else +#include <sys/time.h +#endif #include <iostream> #include <sstream> diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/LogicalTableSchema.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/LogicalTableSchema.java index a7d8a026..df79a8a1 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/LogicalTableSchema.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/LogicalTableSchema.java @@ -23,7 +23,6 @@ import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; -import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import java.io.IOException; @@ -49,15 +48,15 @@ public class LogicalTableSchema extends TableSchema { this.maxLevel = Math.max(this.maxLevel, chunkGroupMetadata.getDevice().segmentNum()); } - private List<IMeasurementSchema> generateIdColumns() { - List<IMeasurementSchema> generatedIdColumns = new ArrayList<>(); + private List<MeasurementSchema> generateTagColumns() { + List<MeasurementSchema> generatedTagColumns = new ArrayList<>(); // level 0 is table name, not id column for (int i = 1; i < maxLevel; i++) { - generatedIdColumns.add( + generatedTagColumns.add( new MeasurementSchema( "__level" + i, TSDataType.STRING, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED)); } - return generatedIdColumns; + return generatedTagColumns; } public void finalizeColumnSchema() { @@ -65,7 +64,7 @@ public class LogicalTableSchema extends TableSchema { return; } - List<IMeasurementSchema> allColumns = new ArrayList<>(generateIdColumns()); + List<MeasurementSchema> allColumns = new ArrayList<>(generateTagColumns()); List<ColumnCategory> allColumnCategories = ColumnCategory.nCopy(ColumnCategory.TAG, allColumns.size()); allColumns.addAll(measurementSchemas); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java index 70ddc690..51ec72b5 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java @@ -45,18 +45,23 @@ public class TableSchema { // the tableName is not serialized since the TableSchema is always stored in a Map, from whose // key the tableName can be known protected String tableName; - protected List<IMeasurementSchema> measurementSchemas; + // the last name after renames + protected String finalTableName; + protected List<MeasurementSchema> measurementSchemas; protected List<ColumnCategory> columnCategories; protected boolean updatable = false; // columnName -> pos in columnSchemas private Map<String, Integer> columnPosIndex; // columnName -> pos in all id columns - private Map<String, Integer> idColumnOrder; + private Map<String, Integer> tagColumnOrder; private int tagColumnCnt = -1; + private boolean deleted = false; + public TableSchema(String tableName) { this.tableName = tableName.toLowerCase(); + this.finalTableName = tableName; this.measurementSchemas = new ArrayList<>(); this.columnCategories = new ArrayList<>(); this.updatable = true; @@ -85,6 +90,7 @@ public class TableSchema { List<IMeasurementSchema> columnSchemas, List<ColumnCategory> columnCategories) { this.tableName = tableName.toLowerCase(); + this.finalTableName = tableName; this.measurementSchemas = new ArrayList<>(columnSchemas.size()); this.columnPosIndex = new HashMap<>(columnSchemas.size()); for (int i = 0; i < columnSchemas.size(); i++) { @@ -113,6 +119,7 @@ public class TableSchema { List<TSDataType> dataTypeList, List<ColumnCategory> categoryList) { this.tableName = tableName.toLowerCase(); + this.finalTableName = tableName; this.measurementSchemas = new ArrayList<>(columnNameList.size()); this.columnPosIndex = new HashMap<>(columnNameList.size()); for (int i = 0; i < columnNameList.size(); i++) { @@ -131,6 +138,7 @@ public class TableSchema { @TsFileApi public TableSchema(String tableName, List<ColumnSchema> columnSchemaList) { this.tableName = tableName.toLowerCase(); + this.finalTableName = tableName; this.measurementSchemas = new ArrayList<>(columnSchemaList.size()); this.columnCategories = new ArrayList<>(columnSchemaList.size()); this.columnPosIndex = new HashMap<>(columnSchemaList.size()); @@ -164,17 +172,17 @@ public class TableSchema { return columnPosIndex; } for (int i = 0; i < measurementSchemas.size(); i++) { - IMeasurementSchema currentColumnSchema = measurementSchemas.get(i); - columnPosIndex.putIfAbsent(currentColumnSchema.getMeasurementName(), i); + MeasurementSchema currentColumnSchema = measurementSchemas.get(i); + columnPosIndex.putIfAbsent(currentColumnSchema.getFinalMeasurementName(), i); } return columnPosIndex; } - public Map<String, Integer> getIdColumnOrder() { - if (idColumnOrder == null) { - idColumnOrder = new HashMap<>(); + public Map<String, Integer> getTagColumnOrder() { + if (tagColumnOrder == null) { + tagColumnOrder = new HashMap<>(); } - return idColumnOrder; + return tagColumnOrder; } /** @@ -187,7 +195,7 @@ public class TableSchema { lowerCaseColumnName, colName -> { for (int i = 0; i < measurementSchemas.size(); i++) { - if (measurementSchemas.get(i).getMeasurementName().equals(lowerCaseColumnName)) { + if (measurementSchemas.get(i).getFinalMeasurementName().equals(lowerCaseColumnName)) { return i; } } @@ -199,15 +207,15 @@ public class TableSchema { * @return i if the given column is the i-th ID column, -1 if the column is not in the schema or * not an ID column */ - public int findIdColumnOrder(String columnName) { + public int findTagColumnOrder(String columnName) { final String lowerCaseColumnName = columnName.toLowerCase(); - return getIdColumnOrder() + return getTagColumnOrder() .computeIfAbsent( lowerCaseColumnName, colName -> { int columnOrder = 0; for (int i = 0; i < measurementSchemas.size(); i++) { - if (measurementSchemas.get(i).getMeasurementName().equals(lowerCaseColumnName) + if (measurementSchemas.get(i).getFinalMeasurementName().equals(lowerCaseColumnName) && columnCategories.get(i) == ColumnCategory.TAG) { return columnOrder; } else if (columnCategories.get(i) == ColumnCategory.TAG) { @@ -248,7 +256,7 @@ public class TableSchema { } public List<IMeasurementSchema> getColumnSchemas() { - return measurementSchemas; + return new ArrayList<>(measurementSchemas); } public List<ColumnCategory> getColumnTypes() { @@ -297,8 +305,17 @@ public class TableSchema { return tableName; } + public String getFinalTableName() { + return finalTableName; + } + public void setTableName(String tableName) { this.tableName = tableName.toLowerCase(); + this.finalTableName = tableName; + } + + public void setFinalTableName(String finalTableName) { + this.finalTableName = finalTableName; } @Override @@ -306,7 +323,7 @@ public class TableSchema { return "TableSchema{" + "tableName='" + tableName - + '\'' + + '\'' + (!Objects.equals(finalTableName, tableName) ? "(" + finalTableName + ")" : "") + ", columnSchemas=" + measurementSchemas + ", columnTypes=" @@ -340,4 +357,48 @@ public class TableSchema { tagColumnCnt = (int) columnCategories.stream().filter(c -> c == ColumnCategory.TAG).count(); return tagColumnCnt; } + + public void renameColumn(String nameBefore, String nameAfter) { + nameBefore = nameBefore.toLowerCase(); + nameAfter = nameAfter.toLowerCase(); + int beforeNameIndx = findColumnIndex(nameBefore); + if (beforeNameIndx == -1) { + return; + } + int afterNameIndex = findColumnIndex(nameAfter); + + measurementSchemas.get(beforeNameIndx).setFinalMeasurementName(nameAfter); + // update the columnPosIndex map + if (columnPosIndex != null) { + columnPosIndex.remove(nameBefore); + columnPosIndex.put(nameAfter, beforeNameIndx); + } + + if (tagColumnOrder != null) { + if (tagColumnOrder.containsKey(nameBefore)) { + int order = tagColumnOrder.remove(nameBefore); + tagColumnOrder.put(nameAfter, order); + } + } + + // if the renamed column already exists, then it must be removed previously + if (afterNameIndex != -1) { + measurementSchemas.remove(afterNameIndex).setDeleted(true); + columnCategories.remove(afterNameIndex); + // need to rebuild the columnPosIndex map + columnPosIndex = null; + buildColumnPosIndex(); + // need to rebuild the tagColumnOrder map + tagColumnOrder = null; + getTagColumnOrder(); + } + } + + public boolean isDeleted() { + return deleted; + } + + public void setDeleted(boolean deleted) { + this.deleted = deleted; + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchemaMap.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchemaMap.java new file mode 100644 index 00000000..3ada1137 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchemaMap.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.file.metadata; + +import java.util.HashMap; +import java.util.Map; +import org.apache.tsfile.file.metadata.evolution.SchemaEvolution; +import org.apache.tsfile.file.metadata.evolution.SchemaEvolution.Builder; + +/** + * A map from table names to their corresponding TableSchema objects. + */ +public class TableSchemaMap extends HashMap<String,TableSchema> { + + public TableSchemaMap() { + } + + public TableSchemaMap(Map<String, TableSchema> tableSchemaMap) { + this.putAll(tableSchemaMap); + } + + /** + * Update this TableSchemaMap according to the given TSFile properties. + * + * @param tsFileProperties the TSFile properties + */ + public void update(Map<String, String> tsFileProperties) { + Builder schemaEvolutionBuilder = new Builder(); + tsFileProperties.entrySet().forEach(entry -> { + SchemaEvolution evolution = schemaEvolutionBuilder.fromProperty(entry); + if (evolution != null) { + evolution.applyTo(this); + } + }); + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TsFileMetadata.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TsFileMetadata.java index e6a72879..12474bd0 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TsFileMetadata.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TsFileMetadata.java @@ -19,6 +19,7 @@ package org.apache.tsfile.file.metadata; +import java.util.LinkedHashMap; import org.apache.tsfile.compatibility.DeserializeConfig; import org.apache.tsfile.encrypt.EncryptUtils; import org.apache.tsfile.exception.encrypt.EncryptException; @@ -29,7 +30,6 @@ import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; -import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; @@ -42,7 +42,7 @@ public class TsFileMetadata { // List of <name, offset, childMetadataIndexType> private Map<String, MetadataIndexNode> tableMetadataIndexNodeMap; - private Map<String, TableSchema> tableSchemaMap; + private TableSchemaMap tableSchemaMap; private boolean hasTableSchemaMapCache; private Map<String, String> tsFileProperties; @@ -91,7 +91,7 @@ public class TsFileMetadata { // tableSchemas int tableSchemaNum = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer); - Map<String, TableSchema> tableSchemaMap = new HashMap<>(); + TableSchemaMap tableSchemaMap = new TableSchemaMap(); for (int i = 0; i < tableSchemaNum; i++) { String tableName = ReadWriteIOUtils.readVarIntString(buffer); TableSchema tableSchema = context.tableSchemaBufferDeserializer.deserialize(buffer, context); @@ -116,7 +116,7 @@ public class TsFileMetadata { if (buffer.hasRemaining()) { int propertiesSize = ReadWriteForEncodingUtils.readVarInt(buffer); - Map<String, String> propertiesMap = new HashMap<>(); + Map<String, String> propertiesMap = new LinkedHashMap<>(); for (int i = 0; i < propertiesSize; i++) { String key = ReadWriteIOUtils.readVarIntString(buffer); String value = ReadWriteIOUtils.readVarIntString(buffer); @@ -163,6 +163,10 @@ public class TsFileMetadata { "Unsupported encryptLevel: " + propertiesMap.get("encryptLevel")); } fileMetaData.tsFileProperties = propertiesMap; + + if (needTableSchemaMap){ + tableSchemaMap.update(propertiesMap); + } } return fileMetaData; @@ -170,7 +174,7 @@ public class TsFileMetadata { public void addProperty(String key, String value) { if (tsFileProperties == null) { - tsFileProperties = new HashMap<>(); + tsFileProperties = new LinkedHashMap<>(); } tsFileProperties.put(key, value); } @@ -261,7 +265,7 @@ public class TsFileMetadata { this.tableMetadataIndexNodeMap = tableMetadataIndexNodeMap; } - public void setTableSchemaMap(Map<String, TableSchema> tableSchemaMap) { + public void setTableSchemaMap(TableSchemaMap tableSchemaMap) { this.tableSchemaMap = tableSchemaMap; this.hasTableSchemaMapCache = true; } @@ -289,4 +293,8 @@ public class TsFileMetadata { public Map<String, String> getTsFileProperties() { return tsFileProperties; } + + public int getPropertiesOffset() { + return propertiesOffset; + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/ColumnRename.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/ColumnRename.java new file mode 100644 index 00000000..8cdcdb0b --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/ColumnRename.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.file.metadata.evolution; + +import java.util.Map; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.file.metadata.TableSchemaMap; + +/** + * A schema evolution operation that renames a column in a table schema. + */ +public class ColumnRename implements SchemaEvolution { + + static final String KEY_PREFIX = "ColumnRename:"; + + private final String tableName; + private final String nameBefore; + private final String nameAfter; + + public ColumnRename(String tableName, String nameBefore, String nameAfter) { + this.tableName = tableName.toLowerCase(); + this.nameBefore = nameBefore.toLowerCase(); + this.nameAfter = nameAfter.toLowerCase(); + } + + @Override + public void applyTo(TableSchemaMap schemaMap) { + TableSchema tableSchema = schemaMap.get(tableName); + if (tableSchema == null) { + return; + } + + tableSchema.renameColumn(nameBefore, nameAfter); + } + + /** + * @return "SEV:ColumnRename:{tableNameLength},{nameBeforeLength},{tableName},{nameBefore}" + */ + @Override + public String propertyKey() { + return SchemaEvolution.KEY_PREFIX + KEY_PREFIX + tableName.length() + "," + nameBefore.length() + "," + tableName + "," + nameBefore; + } + + /** + * @return "{nameAfter}" + */ + @Override + public String propertyValue() { + return nameAfter; + } + + public static ColumnRename fromProperty(Map.Entry<String, String> property) { + String key = property.getKey(); + String nameAfter = property.getValue(); + + int i = KEY_PREFIX.length(); + int tableNameLengthStart = i; + for (; i < key.length(); i++) { + if (key.charAt(i) == ',') { + break; + } + } + int tableNameLength = Integer.parseInt(key.substring(tableNameLengthStart, i)); + + // move past ',' + i ++; + int nameBeforeLengthStart = i; + for (; i < key.length(); i++) { + if (key.charAt(i) == ',') { + break; + } + } + int nameBeforeLength = Integer.parseInt(key.substring(nameBeforeLengthStart, i)); + + // move past ',' + i ++; + String tableName = key.substring(i, i + tableNameLength); + i += tableNameLength; + + // move past ',' + i ++; + String nameBefore = key.substring(i, i + nameBeforeLength); + + return new ColumnRename(tableName, nameBefore, nameAfter); + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/SchemaEvolution.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/SchemaEvolution.java new file mode 100644 index 00000000..68b17815 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/SchemaEvolution.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.file.metadata.evolution; + +import java.util.Map.Entry; +import org.apache.tsfile.file.metadata.TableSchemaMap; + +/** + * A schema evolution operation that can be applied to a TableSchemaMap. + */ +public interface SchemaEvolution { + String KEY_PREFIX = "SEV:"; + + /** + * Apply this schema evolution operation to the given schema map. + * + * @param schemaMap the schema map to apply the operation to + */ + void applyTo(TableSchemaMap schemaMap); + + /** + * Get the property key representing this schema evolution operation. + * All keys should start with the {@link #KEY_PREFIX}. + * + * @return the property key + */ + String propertyKey(); + + /** + * Get the property value representing this schema evolution operation. + * + * @return the property value + */ + String propertyValue(); + + class Builder { + /** + * Create a SchemaEvolution instance from the given property entry. + * + * @param property the property entry + * @return the SchemaEvolution instance, or null if the property key does not match any known + * schema evolution operation + */ + public SchemaEvolution fromProperty(Entry<String, String> property) { + String key = property.getKey(); + + if (!key.startsWith(KEY_PREFIX)) { + return null; + } + + if (key.startsWith(TableRename.KEY_PREFIX)) { + return TableRename.fromProperty(property); + } else if (key.startsWith(ColumnRename.KEY_PREFIX)) { + return ColumnRename.fromProperty(property); + } else { + return null; + } + } + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/TableRename.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/TableRename.java new file mode 100644 index 00000000..4f556564 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/TableRename.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.file.metadata.evolution; + +import java.util.Map; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.file.metadata.TableSchemaMap; + +/** + * A schema evolution operation that renames a table in a schema map. + */ +public class TableRename implements SchemaEvolution{ + + static final String KEY_PREFIX = "TableRename:"; + + private final String nameBefore; + private final String nameAfter; + + public TableRename(String nameBefore, String nameAfter) { + this.nameBefore = nameBefore.toLowerCase(); + this.nameAfter = nameAfter.toLowerCase(); + } + + @Override + public void applyTo(TableSchemaMap schemaMap) { + TableSchema schema = schemaMap.remove(nameBefore); + if (schema == null) { + return; + } + + schema.setFinalTableName(nameAfter); + // if the renamed table already exists, then it must be removed previously + TableSchema deletedSchema = schemaMap.remove(nameAfter); + if (deletedSchema != null) { + deletedSchema.setDeleted(true); + } + schemaMap.put(nameAfter, schema); + } + + /** + * @return "SEV:TableRename:{nameBefore}" + */ + @Override + public String propertyKey() { + return SchemaEvolution.KEY_PREFIX + KEY_PREFIX + nameBefore; + } + + /** + * @return "{nameAfter}" + */ + @Override + public String propertyValue() { + return nameAfter; + } + + public static TableRename fromProperty(Map.Entry<String, String> property) { + String key = property.getKey(); + String nameAfter = property.getValue(); + + String nameBefore = key.substring(TableRename.KEY_PREFIX.length()); + + return new TableRename(nameBefore, nameAfter); + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/TsFileSchemaRewriter.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/TsFileSchemaRewriter.java new file mode 100644 index 00000000..d12cfb27 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/evolution/TsFileSchemaRewriter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.file.metadata.evolution; + +import java.nio.charset.StandardCharsets; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.TsFileMetadata; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.utils.PublicBAOS; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.record.TSRecord; +import org.apache.tsfile.write.schema.MeasurementSchema; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.Collections; +import java.util.Map; + +/** + * A utility class to rewrite the schema of an existing TsFile by appending new properties to its + * TsFileMetadata. + */ +public class TsFileSchemaRewriter { + private final String filePath; + + public TsFileSchemaRewriter(String tsfilePath) { + this.filePath = tsfilePath; + } + + /** + * Append new properties to the TsFileMetadata of the TsFile. + * + * @param newProperties the new properties to append + * @throws IOException if an I/O error occurs + */ + public void appendProperties(Map<String, String> newProperties) throws IOException { + // read TsFileMetadata and its position + TsFileMetadata tsFileMetadata; + long metadataOffset; + try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) { + tsFileMetadata = reader.readFileMetadata(); + metadataOffset = reader.getFileMetadataPos(); + } + + // calculate the position of properties and write new properties to a buffer + int propertiesOffset = tsFileMetadata.getPropertiesOffset(); + Map<String, String> mergedProperties = tsFileMetadata.getTsFileProperties(); + mergedProperties.putAll(newProperties); + PublicBAOS newPropertiesBuffer = new PublicBAOS(4096); + DataOutputStream dataOutputStream = new DataOutputStream(newPropertiesBuffer); + ReadWriteIOUtils.writeVar(mergedProperties, dataOutputStream); + byte[] newPropertiesBuf = newPropertiesBuffer.getBuf(); + int newPropertiesSize = newPropertiesBuffer.size(); + // calculate the new metadata size + int newMetadataSize = propertiesOffset + newPropertiesSize; + + try (RandomAccessFile randomAccessFile = new RandomAccessFile(filePath, "rw")) { + // write the new properties and update the metadata size + randomAccessFile.seek(metadataOffset + propertiesOffset); + randomAccessFile.write(newPropertiesBuf, 0, newPropertiesSize); + randomAccessFile.writeInt(newMetadataSize); + randomAccessFile.write(TSFileConfig.MAGIC_STRING.getBytes(StandardCharsets.UTF_8)); + } + } + + public static void main(String[] args) throws IOException, WriteProcessException { + String tsfilePath = "test.tsfile"; + try (TsFileWriter writer = new TsFileWriter(new File(tsfilePath))) { + writer.registerTimeseries("d1", new MeasurementSchema("s1", TSDataType.INT32)); + TSRecord record = new TSRecord("d1", 0); + record.addPoint("s1", 100); + writer.writeRecord(record); + } + + TsFileSchemaRewriter rewriter = new TsFileSchemaRewriter(tsfilePath); + rewriter.appendProperties(Collections.singletonMap("new_property_key", "new_property_value")); + + try (TsFileSequenceReader reader = new TsFileSequenceReader(tsfilePath)) { + TsFileMetadata tsFileMetadata = reader.readFileMetadata(); + System.out.println("Updated TsFile Properties: " + tsFileMetadata.getTsFileProperties()); + } + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/filter/factory/TagFilterBuilder.java b/java/tsfile/src/main/java/org/apache/tsfile/read/filter/factory/TagFilterBuilder.java index 72339574..3b5f0819 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/filter/factory/TagFilterBuilder.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/filter/factory/TagFilterBuilder.java @@ -41,7 +41,7 @@ public class TagFilterBuilder { } private int getIdColumnIndex(String columnName) { - int idColumnOrder = tableSchema.findIdColumnOrder(columnName); + int idColumnOrder = tableSchema.findTagColumnOrder(columnName); if (idColumnOrder == -1) { throw new IllegalArgumentException("Column '" + columnName + "' is not a tag column"); } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java index 174cdfd9..ca3b45b4 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java @@ -92,7 +92,7 @@ public class SingleDeviceTsBlockReader implements TsBlockReader { for (String idColumn : task.getColumnMapping().getIdColumns()) { final List<Integer> columnPosInResult = task.getColumnMapping().getColumnPos(idColumn); // the first segment in DeviceId is the table name - final int columnPosInId = task.getTableSchema().findIdColumnOrder(idColumn) + 1; + final int columnPosInId = task.getTableSchema().findTagColumnOrder(idColumn) + 1; idColumnContextMap.put(idColumn, new IdColumnContext(columnPosInResult, columnPosInId)); } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/utils/ReadWriteIOUtils.java b/java/tsfile/src/main/java/org/apache/tsfile/utils/ReadWriteIOUtils.java index e6b72e40..4c4261f1 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/utils/ReadWriteIOUtils.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/utils/ReadWriteIOUtils.java @@ -164,6 +164,20 @@ public class ReadWriteIOUtils { return length; } + public static int writeVar(Map<String, String> map, OutputStream stream) throws IOException { + if (map == null) { + return ReadWriteForEncodingUtils.writeVarInt(NO_BYTE_TO_READ, stream); + } + + int length = 0; + length += ReadWriteForEncodingUtils.writeVarInt(map.size(), stream); + for (Entry<String, String> entry : map.entrySet()) { + length += writeVar(entry.getKey(), stream); + length += writeVar(entry.getValue(), stream); + } + return length; + } + public static void write(List<Map<String, String>> maps, OutputStream stream) throws IOException { for (Map<String, String> map : maps) { write(map, stream); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/record/TSRecord.java b/java/tsfile/src/main/java/org/apache/tsfile/write/record/TSRecord.java index 9f36651d..f020fdc6 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/record/TSRecord.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/record/TSRecord.java @@ -163,7 +163,7 @@ public class TSRecord { for (DataPoint dataPoint : dataPointList) { String columnName = dataPoint.getMeasurementId(); - int idColumnOrder = schema.findIdColumnOrder(columnName); + int idColumnOrder = schema.findTagColumnOrder(columnName); if (idColumnOrder != -1) { if (!(dataPoint instanceof StringDataPoint)) { throw new ConflictDataTypeException(dataPoint.getType(), TSDataType.STRING); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/IMeasurementSchema.java b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/IMeasurementSchema.java index ef83b4bb..ff5c7996 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/IMeasurementSchema.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/IMeasurementSchema.java @@ -38,6 +38,8 @@ public interface IMeasurementSchema extends Accountable { String getMeasurementName(); + void setMeasurementName(String measurementName); + CompressionType getCompressor(); TSEncoding getEncodingType(); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java index 59b1d01a..fbb78594 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java @@ -53,12 +53,16 @@ public class MeasurementSchema + RamUsageEstimator.shallowSizeOfInstance(TSEncodingBuilder.class); private String measurementName; + // name after renames + private String finalMeasurementName; private TSDataType dataType; private TSEncoding encoding; private CompressionType compressionType; private TSEncodingBuilder encodingConverter; private Map<String, String> props = null; + private boolean deleted = false; + public MeasurementSchema() {} @TsFileApi @@ -104,6 +108,7 @@ public class MeasurementSchema Map<String, String> props) { this.dataType = dataType; this.measurementName = measurementName; + this.finalMeasurementName = measurementName; this.encoding = encoding; this.props = props; this.compressionType = compressionType; @@ -117,6 +122,7 @@ public class MeasurementSchema Map<String, String> props) { this.dataType = TSDataType.getTsDataType(type); this.measurementName = measurementName; + this.finalMeasurementName = measurementName; this.encoding = TSEncoding.deserialize(encoding); this.props = props; this.compressionType = CompressionType.deserialize(compressor); @@ -127,6 +133,7 @@ public class MeasurementSchema MeasurementSchema measurementSchema = new MeasurementSchema(); measurementSchema.measurementName = ReadWriteIOUtils.readString(inputStream); + measurementSchema.finalMeasurementName = measurementSchema.measurementName; measurementSchema.dataType = TSDataType.deserializeFrom(inputStream); @@ -155,6 +162,7 @@ public class MeasurementSchema MeasurementSchema measurementSchema = new MeasurementSchema(); measurementSchema.measurementName = ReadWriteIOUtils.readString(buffer); + measurementSchema.finalMeasurementName = measurementSchema.measurementName; measurementSchema.dataType = TSDataType.deserializeFrom(buffer); @@ -182,6 +190,7 @@ public class MeasurementSchema MeasurementSchema measurementSchema = new MeasurementSchema(); measurementSchema.measurementName = ReadWriteIOUtils.readString(buffer); + measurementSchema.finalMeasurementName = measurementSchema.measurementName; measurementSchema.dataType = TSDataType.deserializeFrom(buffer); @@ -205,6 +214,15 @@ public class MeasurementSchema public void setMeasurementName(String measurementName) { this.measurementName = measurementName; + this.finalMeasurementName = measurementName; + } + + public String getFinalMeasurementName() { + return finalMeasurementName; + } + + public void setFinalMeasurementName(String finalMeasurementName) { + this.finalMeasurementName = finalMeasurementName; } @Override @@ -470,4 +488,12 @@ public class MeasurementSchema RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP, RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY); } + + public boolean isDeleted() { + return deleted; + } + + public void setDeleted(boolean deleted) { + this.deleted = deleted; + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java index 777eaf87..f40c4486 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java @@ -165,6 +165,11 @@ public class VectorMeasurementSchema return deviceId; } + @Override + public void setMeasurementName(String measurementName) { + deviceId = measurementName; + } + @Deprecated // Aligned series should not invoke this method @Override public CompressionType getCompressor() { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java index 96cc383e..7137d2f5 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java @@ -35,6 +35,7 @@ import org.apache.tsfile.file.metadata.IChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.MeasurementMetadataIndexEntry; import org.apache.tsfile.file.metadata.MetadataIndexNode; +import org.apache.tsfile.file.metadata.TableSchemaMap; import org.apache.tsfile.file.metadata.TimeseriesMetadata; import org.apache.tsfile.file.metadata.TsFileMetadata; import org.apache.tsfile.file.metadata.enums.CompressionType; @@ -546,7 +547,7 @@ public class TsFileIOWriter implements AutoCloseable { TsFileMetadata tsFileMetadata = new TsFileMetadata(); tsFileMetadata.setTableMetadataIndexNodeMap(tableNodesMap); - tsFileMetadata.setTableSchemaMap(schema.getTableSchemaMap()); + tsFileMetadata.setTableSchemaMap(new TableSchemaMap(schema.getTableSchemaMap())); tsFileMetadata.setMetaOffset(metaOffset); tsFileMetadata.setBloomFilter(filter); tsFileMetadata.addProperty("encryptLevel", encryptLevel); diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/writer/VectorMeasurementSchemaStub.java b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/VectorMeasurementSchemaStub.java index 32c43761..61a07710 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/write/writer/VectorMeasurementSchemaStub.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/VectorMeasurementSchemaStub.java @@ -38,6 +38,10 @@ public class VectorMeasurementSchemaStub extends VectorMeasurementSchema { return ""; } + @Override + public void setMeasurementName(String measurementName) { + } + @Override public CompressionType getCompressor() { return CompressionType.UNCOMPRESSED;
