This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch tsFile_v4 in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit b612ec0686d5c84c958e309f0bceef51b767d773 Author: jt2594838 <[email protected]> AuthorDate: Sun Apr 7 11:58:28 2024 +0800 add TableSchema push Schema in TsFileWriter down to TsFileIOWriter --- .../org/apache/tsfile/file/metadata/IDeviceID.java | 2 + .../apache/tsfile/file/metadata/TableSchema.java | 29 +++++++++++++ .../tsfile/file/metadata/TsFileMetadata.java | 3 ++ .../java/org/apache/tsfile/write/TsFileWriter.java | 48 ++++++++++++---------- .../org/apache/tsfile/write/schema/Schema.java | 13 +++++- .../apache/tsfile/write/writer/TsFileIOWriter.java | 12 ++++++ 6 files changed, 84 insertions(+), 23 deletions(-) diff --git a/tsfile/src/main/java/org/apache/tsfile/file/metadata/IDeviceID.java b/tsfile/src/main/java/org/apache/tsfile/file/metadata/IDeviceID.java index 04d73c19..9008cd8c 100644 --- a/tsfile/src/main/java/org/apache/tsfile/file/metadata/IDeviceID.java +++ b/tsfile/src/main/java/org/apache/tsfile/file/metadata/IDeviceID.java @@ -38,6 +38,8 @@ public interface IDeviceID extends Comparable<IDeviceID>, Accountable { boolean isEmpty(); + String getTableName(); + static IDeviceID deserializeFrom(ByteBuffer byteBuffer) { return new PlainDeviceID(ReadWriteIOUtils.readVarIntString(byteBuffer)); } diff --git a/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java b/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java new file mode 100644 index 00000000..3ad644fe --- /dev/null +++ b/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.file.metadata; + +import org.apache.tsfile.write.schema.MeasurementSchema; + +import java.util.List; + +public class TableSchema { + protected String tableName; + protected List<MeasurementSchema> columnSchemas; +} diff --git a/tsfile/src/main/java/org/apache/tsfile/file/metadata/TsFileMetadata.java b/tsfile/src/main/java/org/apache/tsfile/file/metadata/TsFileMetadata.java index 68bd6dd6..726a1148 100644 --- a/tsfile/src/main/java/org/apache/tsfile/file/metadata/TsFileMetadata.java +++ b/tsfile/src/main/java/org/apache/tsfile/file/metadata/TsFileMetadata.java @@ -26,6 +26,7 @@ import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.Map; /** TSFileMetaData collects all metadata info and saves in its data structure. */ public class TsFileMetadata { @@ -35,6 +36,8 @@ public class TsFileMetadata { // List of <name, offset, childMetadataIndexType> private MetadataIndexNode metadataIndex; + private Map<String, MetadataIndexNode> tableMetadataIndexNodeMap; + private Map<String, TableSchema> tableSchemaMap; // offset of MetaMarker.SEPARATOR private long metaOffset; diff --git a/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java b/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java index cdbb8b32..8749f588 100644 --- a/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java +++ b/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java @@ -61,8 +61,6 @@ public class TsFileWriter implements AutoCloseable { protected static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig(); private static final Logger LOG = LoggerFactory.getLogger(TsFileWriter.class); - /** schema of this TsFile. */ - protected final Schema schema; /** IO writer of this TsFile. */ private final TsFileIOWriter fileWriter; @@ -188,9 +186,9 @@ public class TsFileWriter implements AutoCloseable { measurementGroupMap.put(new Path(entry.getKey().getDevice()), group); } } - this.schema = new Schema(measurementGroupMap); + getSchema().setRegisteredTimeseries(measurementGroupMap); } else { - this.schema = schema; + fileWriter.setSchema(schema); } this.pageSize = conf.getPageSizeInByte(); this.chunkGroupSizeThreshold = conf.getGroupSizeInByte(); @@ -206,7 +204,7 @@ public class TsFileWriter implements AutoCloseable { public void registerSchemaTemplate( String templateName, Map<String, MeasurementSchema> template, boolean isAligned) { - schema.registerSchemaTemplate(templateName, new MeasurementGroup(isAligned, template)); + getSchema().registerSchemaTemplate(templateName, new MeasurementGroup(isAligned, template)); } /** @@ -218,16 +216,16 @@ public class TsFileWriter implements AutoCloseable { * @throws WriteProcessException */ public void registerDevice(String deviceId, String templateName) throws WriteProcessException { - if (!schema.getSchemaTemplates().containsKey(templateName)) { + if (!getSchema().getSchemaTemplates().containsKey(templateName)) { throw new WriteProcessException("given template is not existed! " + templateName); } - if (schema.getRegisteredTimeseriesMap().containsKey(new Path(deviceId))) { + if (getSchema().getRegisteredTimeseriesMap().containsKey(new Path(deviceId))) { throw new WriteProcessException( "this device " + deviceId + " has been registered, you can only use registerDevice method to register empty device."); } - schema.registerDevice(deviceId, templateName); + getSchema().registerDevice(deviceId, templateName); } /** @@ -240,8 +238,8 @@ public class TsFileWriter implements AutoCloseable { public void registerTimeseries(Path devicePath, MeasurementSchema measurementSchema) throws WriteProcessException { MeasurementGroup measurementGroup; - if (schema.containsDevice(devicePath)) { - measurementGroup = schema.getSeriesSchema(devicePath); + if (getSchema().containsDevice(devicePath)) { + measurementGroup = getSchema().getSeriesSchema(devicePath); if (measurementGroup.isAligned()) { throw new WriteProcessException( "given device " + devicePath + " has been registered for aligned timeseries."); @@ -259,7 +257,7 @@ public class TsFileWriter implements AutoCloseable { measurementGroup .getMeasurementSchemaMap() .put(measurementSchema.getMeasurementId(), measurementSchema); - schema.registerMeasurementGroup(devicePath, measurementGroup); + getSchema().registerMeasurementGroup(devicePath, measurementGroup); } /** @@ -288,8 +286,8 @@ public class TsFileWriter implements AutoCloseable { */ public void registerAlignedTimeseries(Path devicePath, List<MeasurementSchema> measurementSchemas) throws WriteProcessException { - if (schema.containsDevice(devicePath)) { - if (schema.getSeriesSchema(devicePath).isAligned()) { + if (getSchema().containsDevice(devicePath)) { + if (getSchema().getSeriesSchema(devicePath).isAligned()) { throw new WriteProcessException( "given device " + devicePath @@ -306,7 +304,7 @@ public class TsFileWriter implements AutoCloseable { .getMeasurementSchemaMap() .put(measurementSchema.getMeasurementId(), measurementSchema); }); - schema.registerMeasurementGroup(devicePath, measurementGroup); + getSchema().registerMeasurementGroup(devicePath, measurementGroup); } private boolean checkIsTimeseriesExist(TSRecord record, boolean isAligned) @@ -318,10 +316,10 @@ public class TsFileWriter implements AutoCloseable { // initial all SeriesWriters of measurements in this TSRecord Path devicePath = new Path(record.deviceId); List<MeasurementSchema> measurementSchemas; - if (schema.containsDevice(devicePath)) { + if (getSchema().containsDevice(devicePath)) { measurementSchemas = checkIsAllMeasurementsInGroup( - record.dataPointList, schema.getSeriesSchema(devicePath), isAligned); + record.dataPointList, getSchema().getSeriesSchema(devicePath), isAligned); if (isAligned) { for (IMeasurementSchema s : measurementSchemas) { if (flushedMeasurementsInDeviceMap.containsKey( @@ -338,10 +336,11 @@ public class TsFileWriter implements AutoCloseable { } } groupWriter.tryToAddSeriesWriter(measurementSchemas); - } else if (schema.getSchemaTemplates() != null && schema.getSchemaTemplates().size() == 1) { + } else if (getSchema().getSchemaTemplates() != null + && getSchema().getSchemaTemplates().size() == 1) { // use the default template without needing to register device MeasurementGroup measurementGroup = - schema.getSchemaTemplates().entrySet().iterator().next().getValue(); + getSchema().getSchemaTemplates().entrySet().iterator().next().getValue(); measurementSchemas = checkIsAllMeasurementsInGroup(record.dataPointList, measurementGroup, isAligned); groupWriter.tryToAddSeriesWriter(measurementSchemas); @@ -358,8 +357,8 @@ public class TsFileWriter implements AutoCloseable { Path devicePath = new Path(tablet.deviceId); List<MeasurementSchema> schemas = tablet.getSchemas(); - if (schema.containsDevice(devicePath)) { - checkIsAllMeasurementsInGroup(schema.getSeriesSchema(devicePath), schemas, isAligned); + if (getSchema().containsDevice(devicePath)) { + checkIsAllMeasurementsInGroup(getSchema().getSeriesSchema(devicePath), schemas, isAligned); if (isAligned) { for (IMeasurementSchema s : schemas) { if (flushedMeasurementsInDeviceMap.containsKey( @@ -376,9 +375,10 @@ public class TsFileWriter implements AutoCloseable { } } groupWriter.tryToAddSeriesWriter(schemas); - } else if (schema.getSchemaTemplates() != null && schema.getSchemaTemplates().size() == 1) { + } else if (getSchema().getSchemaTemplates() != null + && getSchema().getSchemaTemplates().size() == 1) { MeasurementGroup measurementGroup = - schema.getSchemaTemplates().entrySet().iterator().next().getValue(); + getSchema().getSchemaTemplates().entrySet().iterator().next().getValue(); checkIsAllMeasurementsInGroup(measurementGroup, schemas, isAligned); groupWriter.tryToAddSeriesWriter(schemas); } else { @@ -648,4 +648,8 @@ public class TsFileWriter implements AutoCloseable { public TsFileIOWriter getIOWriter() { return this.fileWriter; } + + public Schema getSchema() { + return fileWriter.getSchema(); + } } diff --git a/tsfile/src/main/java/org/apache/tsfile/write/schema/Schema.java b/tsfile/src/main/java/org/apache/tsfile/write/schema/Schema.java index 54ab6663..a82ab71d 100644 --- a/tsfile/src/main/java/org/apache/tsfile/write/schema/Schema.java +++ b/tsfile/src/main/java/org/apache/tsfile/write/schema/Schema.java @@ -18,6 +18,7 @@ */ package org.apache.tsfile.write.schema; +import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.read.common.Path; import org.apache.tsfile.utils.MeasurementGroup; @@ -36,11 +37,13 @@ public class Schema implements Serializable { * Path (devicePath) -> measurementSchema By default, use the LinkedHashMap to store the order of * insertion */ - private final Map<Path, MeasurementGroup> registeredTimeseries; + private Map<Path, MeasurementGroup> registeredTimeseries; /** template name -> (measurement -> MeasurementSchema) */ private Map<String, MeasurementGroup> schemaTemplates; + private Map<String, TableSchema> tableSchemaMap = new HashMap<>(); + public Schema() { this.registeredTimeseries = new LinkedHashMap<>(); } @@ -68,6 +71,10 @@ public class Schema implements Serializable { this.schemaTemplates.put(templateName, measurementGroup); } + public void registerTableSchema(String tableName, TableSchema tableSchema) { + tableSchemaMap.put(tableName, tableSchema); + } + /** If template does not exist, an nonAligned timeseries is created by default */ public void extendTemplate(String templateName, MeasurementSchema descriptor) { if (schemaTemplates == null) { @@ -103,6 +110,10 @@ public class Schema implements Serializable { return registeredTimeseries.containsKey(devicePath); } + public void setRegisteredTimeseries(Map<Path, MeasurementGroup> registeredTimeseries) { + this.registeredTimeseries = registeredTimeseries; + } + // for test public Map<Path, MeasurementGroup> getRegisteredTimeseriesMap() { return registeredTimeseries; diff --git a/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java index ef9515c5..23b3d5ac 100644 --- a/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java @@ -45,6 +45,7 @@ import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.apache.tsfile.write.schema.Schema; import org.apache.tsfile.write.writer.tsmiterator.TSMIterator; import org.apache.commons.io.FileUtils; @@ -84,6 +85,9 @@ public class TsFileIOWriter implements AutoCloseable { VERSION_NUMBER_BYTE = TSFileConfig.VERSION_NUMBER; } + /** schema of this TsFile. */ + protected Schema schema = new Schema(); + protected TsFileOutput out; protected boolean canWrite = true; protected File file; @@ -691,4 +695,12 @@ public class TsFileIOWriter implements AutoCloseable { public TsFileOutput getTsFileOutput() { return this.out; } + + public Schema getSchema() { + return schema; + } + + public void setSchema(Schema schema) { + this.schema = schema; + } }
