This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch new_java_interfaces in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 5839f52788a7e14d3d048a2e199467bd60fbe7b1 Author: Tian Jiang <[email protected]> AuthorDate: Wed Apr 23 16:04:46 2025 +0800 add table model insert record --- .../main/java/org/apache/tsfile/utils/Binary.java | 3 ++ .../apache/tsfile/file/metadata/TableSchema.java | 9 +++++ .../write/chunk/AlignedChunkGroupWriterImpl.java | 3 ++ .../org/apache/tsfile/write/record/TSRecord.java | 39 ++++++++++++++++++++-- .../tsfile/write/record/datapoint/DataPoint.java | 11 ++++++ .../tsfile/write/v4/DeviceTableModelWriter.java | 38 +++++++++++++++++++++ .../org/apache/tsfile/write/v4/ITsFileWriter.java | 4 +++ .../apache/tsfile/write/TsFileWriteApiTest.java | 31 +++++++++++++++++ 8 files changed, 135 insertions(+), 3 deletions(-) diff --git a/java/common/src/main/java/org/apache/tsfile/utils/Binary.java b/java/common/src/main/java/org/apache/tsfile/utils/Binary.java index f65d19c3..07337174 100644 --- a/java/common/src/main/java/org/apache/tsfile/utils/Binary.java +++ b/java/common/src/main/java/org/apache/tsfile/utils/Binary.java @@ -109,6 +109,9 @@ public class Binary implements Comparable<Binary>, Serializable, Accountable { @Override public String toString() { + if (values == null) { + return null; + } // use UTF_8 by default since toString do not provide parameter return getStringValue(StandardCharsets.UTF_8); } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java index 81964473..70ddc690 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java @@ -53,6 +53,7 @@ public class TableSchema { private Map<String, Integer> columnPosIndex; // columnName -> pos in all id columns private Map<String, Integer> idColumnOrder; + private int tagColumnCnt = -1; public TableSchema(String tableName) { this.tableName = tableName.toLowerCase(); @@ -331,4 +332,12 @@ public class TableSchema { public int hashCode() { return Objects.hash(tableName, measurementSchemas, columnCategories); } + + public int getTagColumnCnt() { + if (tagColumnCnt != -1) { + return tagColumnCnt; + } + tagColumnCnt = (int) columnCategories.stream().filter(c -> c == ColumnCategory.TAG).count(); + return tagColumnCnt; + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java index a47df975..77858786 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java @@ -165,6 +165,9 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter { ? point.getMeasurementId().toLowerCase() : point.getMeasurementId(); ValueChunkWriter valueChunkWriter = valueChunkWriterMap.get(measurementId); + if (valueChunkWriter == null) { + valueChunkWriter = tryToAddSeriesWriterInternal(point.getMeasurementSchema()); + } switch (point.getType()) { case BOOLEAN: valueChunkWriter.write(time, (boolean) point.getValue(), isNull); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/record/TSRecord.java b/java/tsfile/src/main/java/org/apache/tsfile/write/record/TSRecord.java index b7c24831..ed778d56 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/record/TSRecord.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/record/TSRecord.java @@ -18,10 +18,15 @@ */ package org.apache.tsfile.write.record; +import org.apache.tsfile.annotations.TableModel; import org.apache.tsfile.annotations.TsFileApi; import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.ConflictDataTypeException; +import org.apache.tsfile.exception.write.WriteProcessException; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.IDeviceID.Factory; +import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.StringContainer; import org.apache.tsfile.write.record.datapoint.BooleanDataPoint; @@ -49,19 +54,22 @@ public class TSRecord { /** deviceId of this TSRecord. */ public IDeviceID deviceId; + private String tableName; + /** all value of this TSRecord. */ public List<DataPoint> dataPointList = new ArrayList<>(); /** * constructor of TSRecord. * - * @param deviceId deviceId of this TSRecord + * @param deviceIdOrTableName deviceId of this TSRecord * @param timestamp timestamp of this TSRecord */ @TsFileApi - public TSRecord(String deviceId, long timestamp) { + public TSRecord(String deviceIdOrTableName, long timestamp) { this.time = timestamp; - this.deviceId = Factory.DEFAULT_FACTORY.create(deviceId); + this.deviceId = Factory.DEFAULT_FACTORY.create(deviceIdOrTableName); + this.tableName = deviceIdOrTableName; } @TsFileApi @@ -142,4 +150,29 @@ public class TSRecord { sc.addTail("]}"); return sc.toString(); } + + @TableModel + public IDeviceID getDeviceId(TableSchema schema) throws WriteProcessException { + int tagCnt = schema.getTagColumnCnt(); + String[] idSegments = new String[tagCnt + 1]; + idSegments[0] = schema.getTableName(); + + for (DataPoint dataPoint : dataPointList) { + String columnName = dataPoint.getMeasurementId(); + int idColumnOrder = schema.findIdColumnOrder(columnName); + if (idColumnOrder != -1) { + if (!(dataPoint instanceof StringDataPoint)) { + throw new ConflictDataTypeException(dataPoint.getType(), TSDataType.STRING); + } + Object value = dataPoint.getValue(); + idSegments[idColumnOrder + 1] = value != null ? value.toString() : null; + } + } + + return Factory.DEFAULT_FACTORY.create(idSegments); + } + + public String getTableName() { + return tableName.toLowerCase(); + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/record/datapoint/DataPoint.java b/java/tsfile/src/main/java/org/apache/tsfile/write/record/datapoint/DataPoint.java index 80e617ca..05352c5e 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/record/datapoint/DataPoint.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/record/datapoint/DataPoint.java @@ -29,6 +29,7 @@ import org.apache.tsfile.write.chunk.ChunkWriterImpl; import java.io.IOException; import java.time.LocalDate; +import org.apache.tsfile.write.schema.IMeasurementSchema; /** * This is a abstract class representing a data point. DataPoint consists of a measurement id and a @@ -43,6 +44,8 @@ public abstract class DataPoint { /** measurementId of this DataPoint. */ protected final String measurementId; + protected IMeasurementSchema measurementSchema; + /** * constructor of DataPoint. * @@ -157,4 +160,12 @@ public abstract class DataPoint { public void setDate(LocalDate value) { throw new UnsupportedOperationException("set Date not support in DataPoint"); } + + public IMeasurementSchema getMeasurementSchema() { + return measurementSchema; + } + + public void setMeasurementSchema(IMeasurementSchema measurementSchema) { + this.measurementSchema = measurementSchema; + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java index 66fca2cf..1752d699 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java @@ -29,7 +29,9 @@ import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.WriteUtils; +import org.apache.tsfile.write.record.TSRecord; import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.record.datapoint.DataPoint; import org.apache.tsfile.write.schema.IMeasurementSchema; import java.io.File; @@ -74,6 +76,39 @@ public class DeviceTableModelWriter extends AbstractTableModelTsFileWriter { checkMemorySizeAndMayFlushChunks(); } + @TsFileApi + @Override + public void write(TSRecord record) throws IOException, WriteProcessException { + String tableName = record.getTableName(); + if (tableName == null) { + tableName = this.tableName; + } + if (tableName == null) { + throw new WriteProcessException("Table name is null"); + } + + final TableSchema tableSchema = getSchema().getTableSchemaMap().get(tableName); + if (tableSchema == null) { + throw new NoTableException(tableName); + } + + IDeviceID deviceId = record.getDeviceId(tableSchema); + List<DataPoint> fieldDataPoints = new ArrayList<>(record.dataPointList.size()); + for (DataPoint dataPoint : record.dataPointList) { + int columnIndex = tableSchema.findColumnIndex(dataPoint.getMeasurementId()); + if (columnIndex < 0) { + throw new NoMeasurementException(dataPoint.getMeasurementId()); + } + ColumnCategory columnCategory = tableSchema.getColumnTypes().get(columnIndex); + if (columnCategory == ColumnCategory.FIELD) { + fieldDataPoints.add(dataPoint); + dataPoint.setMeasurementSchema(tableSchema.getColumnSchemas().get(columnIndex)); + } + } + recordCount += tryToInitialGroupWriter(deviceId, isTableWriteAligned, true).write(record.time, fieldDataPoints); + checkMemorySizeAndMayFlushChunks(); + } + private void checkIsTableExistAndSetColumnCategoryList(Tablet tablet) throws WriteProcessException { String tabletTableName = tablet.getTableName(); @@ -82,6 +117,9 @@ public class DeviceTableModelWriter extends AbstractTableModelTsFileWriter { } tablet.setTableName(this.tableName); final TableSchema tableSchema = getSchema().getTableSchemaMap().get(tableName); + if (tableSchema == null) { + throw new NoTableException(tabletTableName); + } List<ColumnCategory> columnCategoryListForTablet = new ArrayList<>(tablet.getSchemas().size()); for (IMeasurementSchema writingColumnSchema : tablet.getSchemas()) { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/ITsFileWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/ITsFileWriter.java index 8610ee1f..25ccb031 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/ITsFileWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/ITsFileWriter.java @@ -21,6 +21,7 @@ package org.apache.tsfile.write.v4; import org.apache.tsfile.annotations.TsFileApi; import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.write.record.TSRecord; import org.apache.tsfile.write.record.Tablet; import java.io.IOException; @@ -32,4 +33,7 @@ public interface ITsFileWriter extends AutoCloseable { @TsFileApi void close(); + + @TsFileApi + void write(TSRecord record) throws IOException, WriteProcessException; } diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java index 17a0d0f1..11f1b779 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java @@ -971,4 +971,35 @@ public class TsFileWriteApiTest { Assert.assertTrue(resultSet.isNull(3)); } } + + @Test + public void writeRecord() + throws IOException, WriteProcessException, ReadProcessException { + setEnv(100 * 1024 * 1024, 10 * 1024); + + TableSchema tableSchema = + new TableSchema( + "Table1", + Arrays.asList( + new ColumnSchema("tag1", TSDataType.STRING, ColumnCategory.TAG), + new ColumnSchema("field1", TSDataType.BOOLEAN, ColumnCategory.FIELD))); + try (ITsFileWriter writer = + new TsFileWriterBuilder().file(f).tableSchema(tableSchema).build()) { + writer.write(new TSRecord("Table1", 0).addPoint("tag1", "d1").addPoint("field1", true)); + writer.write(new TSRecord("Table1", 1).addPoint("tag1", "d2").addPoint("field1", false)); + } + try (ITsFileReader reader = new TsFileReaderBuilder().file(f).build(); + ResultSet resultSet = + reader.query( + "table1", Arrays.asList("tag1", "field1"), Long.MIN_VALUE, Long.MAX_VALUE)) { + Assert.assertTrue(resultSet.next()); + Assert.assertEquals(0, resultSet.getLong(1)); + Assert.assertEquals("d1", resultSet.getString(2)); + Assert.assertTrue(resultSet.getBoolean(3)); + Assert.assertTrue(resultSet.next()); + Assert.assertEquals(1, resultSet.getLong(1)); + Assert.assertEquals("d2", resultSet.getString(2)); + Assert.assertFalse(resultSet.getBoolean(3)); + } + } }
