This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch tsFile_v4
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/tsFile_v4 by this push:
new a1151813 add tablet check
a1151813 is described below
commit a11518139f1f764561b7c7d862e0bcb9b1052274
Author: jt2594838 <[email protected]>
AuthorDate: Sun Apr 7 17:27:34 2024 +0800
add tablet check
---
.../exception/write/ConflictDataTypeException.java | 30 +++
.../tsfile/exception/write/NoTableException.java | 8 +
.../java/org/apache/tsfile/write/TsFileWriter.java | 30 +++
.../org/apache/tsfile/write/record/Tablet.java | 210 ++++++++++++++-------
4 files changed, 212 insertions(+), 66 deletions(-)
diff --git
a/tsfile/src/main/java/org/apache/tsfile/exception/write/ConflictDataTypeException.java
b/tsfile/src/main/java/org/apache/tsfile/exception/write/ConflictDataTypeException.java
new file mode 100644
index 00000000..0523c234
--- /dev/null
+++
b/tsfile/src/main/java/org/apache/tsfile/exception/write/ConflictDataTypeException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.exception.write;
+
+import org.apache.tsfile.enums.TSDataType;
+
+public class ConflictDataTypeException extends WriteProcessException {
+
+ public ConflictDataTypeException(TSDataType writing, TSDataType registered) {
+ super(String.format("Conflict data type: %s (writing) and %s
(registered)", writing,
+ registered));
+ }
+}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/exception/write/NoTableException.java
b/tsfile/src/main/java/org/apache/tsfile/exception/write/NoTableException.java
new file mode 100644
index 00000000..43a80796
--- /dev/null
+++
b/tsfile/src/main/java/org/apache/tsfile/exception/write/NoTableException.java
@@ -0,0 +1,8 @@
+package org.apache.tsfile.exception.write;
+
+public class NoTableException extends WriteProcessException{
+
+ public NoTableException(String tableName) {
+ super(String.format("Table %s not found", tableName));
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
b/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
index 8749f588..315dd1ae 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
@@ -20,10 +20,13 @@ package org.apache.tsfile.write;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.exception.write.ConflictDataTypeException;
import org.apache.tsfile.exception.write.NoMeasurementException;
+import org.apache.tsfile.exception.write.NoTableException;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.utils.MeasurementGroup;
import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl;
@@ -350,6 +353,25 @@ public class TsFileWriter implements AutoCloseable {
return true;
}
+ private void checkIsTableExist(Tablet tablet) throws WriteProcessException {
+ String tableName = tablet.deviceId;
+ final TableSchema tableSchema =
getSchema().getTableSchemaMap().get(tableName);
+ if (tableSchema == null) {
+ throw new NoTableException(tableName);
+ }
+
+ for (MeasurementSchema writingColumnSchema : tablet.getSchemas()) {
+ final int columnIndex =
tableSchema.findColumnIndex(writingColumnSchema.getMeasurementId());
+ if (columnIndex < 0) {
+ throw new
NoMeasurementException(writingColumnSchema.getMeasurementId());
+ }
+ final MeasurementSchema registeredColumnSchema =
tableSchema.getColumnSchemas().get(columnIndex);
+ if
(!writingColumnSchema.getType().equals(registeredColumnSchema.getType())) {
+ throw new ConflictDataTypeException(writingColumnSchema.getType(),
registeredColumnSchema.getType());
+ }
+ }
+ }
+
private void checkIsTimeseriesExist(Tablet tablet, boolean isAligned)
throws WriteProcessException, IOException {
IChunkGroupWriter groupWriter =
@@ -652,4 +674,12 @@ public class TsFileWriter implements AutoCloseable {
public Schema getSchema() {
return fileWriter.getSchema();
}
+
+ public boolean writeTable(Tablet tablet) throws IOException,
WriteProcessException {
+ // make sure the ChunkGroupWriter for this Tablet exist
+ checkIsTableExist(tablet);
+ // get corresponding ChunkGroupWriter and write this Tablet
+ recordCount += groupWriters.get(new
PlainDeviceID(tablet.deviceId)).write(tablet);
+ return checkMemorySizeAndMayFlushChunks();
+ }
}
diff --git a/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java
b/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java
index 61ab2e41..1ef2ac6c 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java
@@ -53,24 +53,50 @@ public class Tablet {
private static final int DEFAULT_SIZE = 1024;
private static final String NOT_SUPPORT_DATATYPE = "Data type %s is not
supported.";
- /** DeviceId of this {@link Tablet} */
+ /**
+ * DeviceId of this {@link Tablet}
+ */
public String deviceId;
- /** The list of {@link MeasurementSchema}s for creating the {@link Tablet} */
+ /**
+ * The list of {@link MeasurementSchema}s for creating the {@link Tablet}
+ */
private List<MeasurementSchema> schemas;
+ /**
+ * Marking the type of each column, namely ID or MEASUREMENT.
+ * Notice: the ID columns must be the FIRST ones.
+ */
+ private List<ColumnType> columnTypes;
- /** MeasurementId->indexOf({@link MeasurementSchema}) */
+ /**
+ * Columns in [0, idColumnRange) are all ID columns.
+ */
+ private int idColumnRange;
+
+ /**
+ * MeasurementId->indexOf({@link MeasurementSchema})
+ */
private final Map<String, Integer> measurementIndex;
- /** Timestamps in this {@link Tablet} */
+ /**
+ * Timestamps in this {@link Tablet}
+ */
public long[] timestamps;
- /** Each object is a primitive type array, which represents values of one
measurement */
+ /**
+ * Each object is a primitive type array, which represents values of one
measurement
+ */
public Object[] values;
- /** Each {@link BitMap} represents the existence of each value in the
current column. */
+ /**
+ * Each {@link BitMap} represents the existence of each value in the current
column.
+ */
public BitMap[] bitMaps;
- /** The number of rows to include in this {@link Tablet} */
+ /**
+ * The number of rows to include in this {@link Tablet}
+ */
public int rowSize;
- /** The maximum number of rows for this {@link Tablet} */
+ /**
+ * The maximum number of rows for this {@link Tablet}
+ */
private final int maxRowNumber;
/**
@@ -79,7 +105,7 @@ public class Tablet {
*
* @param deviceId the name of the device specified to be written in
* @param schemas the list of {@link MeasurementSchema}s for creating the
tablet, only
- * measurementId and type take effects
+ * measurementId and type take effects
*/
public Tablet(String deviceId, List<MeasurementSchema> schemas) {
this(deviceId, schemas, DEFAULT_SIZE);
@@ -92,12 +118,13 @@ public class Tablet {
*
* @param deviceId the name of the device specified to be written in
* @param schemas the list of {@link MeasurementSchema}s for creating the
row batch, only
- * measurementId and type take effects
+ * measurementId and type take effects
* @param maxRowNumber the maximum number of rows for this tablet
*/
public Tablet(String deviceId, List<MeasurementSchema> schemas, int
maxRowNumber) {
this.deviceId = deviceId;
this.schemas = new ArrayList<>(schemas);
+ setColumnTypes(ColumnType.nCopy(ColumnType.MEASUREMENT, schemas.size()));
this.maxRowNumber = maxRowNumber;
measurementIndex = new HashMap<>();
constructMeasurementIndexMap();
@@ -113,7 +140,7 @@ public class Tablet {
*
* @param deviceId the name of the device specified to be written in
* @param schemas the list of {@link MeasurementSchema}s for creating the
row batch, only
- * measurementId and type take effects
+ * measurementId and type take effects
* @param timestamps given timestamps
* @param values given values
* @param bitMaps given {@link BitMap}s
@@ -126,8 +153,21 @@ public class Tablet {
Object[] values,
BitMap[] bitMaps,
int maxRowNumber) {
+ this(deviceId, schemas, ColumnType.nCopy(ColumnType.MEASUREMENT,
schemas.size()), timestamps,
+ values, bitMaps, maxRowNumber);
+ }
+
+ public Tablet(
+ String deviceId,
+ List<MeasurementSchema> schemas,
+ List<ColumnType> columnTypes,
+ long[] timestamps,
+ Object[] values,
+ BitMap[] bitMaps,
+ int maxRowNumber) {
this.deviceId = deviceId;
this.schemas = schemas;
+ setColumnTypes(columnTypes);
this.timestamps = timestamps;
this.values = values;
this.bitMaps = bitMaps;
@@ -138,6 +178,7 @@ public class Tablet {
constructMeasurementIndexMap();
}
+
private void constructMeasurementIndexMap() {
int indexInSchema = 0;
for (MeasurementSchema schema : schemas) {
@@ -186,49 +227,43 @@ public class Tablet {
bitMaps[indexOfSchema].mark(rowIndex);
}
switch (dataType) {
- case TEXT:
- {
- Binary[] sensor = (Binary[]) values[indexOfSchema];
- if (value instanceof Binary) {
- sensor[rowIndex] = (Binary) value;
- } else {
- sensor[rowIndex] =
- value != null
- ? new Binary((String) value, TSFileConfig.STRING_CHARSET)
- : Binary.EMPTY_VALUE;
- }
- break;
- }
- case FLOAT:
- {
- float[] sensor = (float[]) values[indexOfSchema];
- sensor[rowIndex] = value != null ? (float) value : Float.MIN_VALUE;
- break;
- }
- case INT32:
- {
- int[] sensor = (int[]) values[indexOfSchema];
- sensor[rowIndex] = value != null ? (int) value : Integer.MIN_VALUE;
- break;
- }
- case INT64:
- {
- long[] sensor = (long[]) values[indexOfSchema];
- sensor[rowIndex] = value != null ? (long) value : Long.MIN_VALUE;
- break;
- }
- case DOUBLE:
- {
- double[] sensor = (double[]) values[indexOfSchema];
- sensor[rowIndex] = value != null ? (double) value : Double.MIN_VALUE;
- break;
- }
- case BOOLEAN:
- {
- boolean[] sensor = (boolean[]) values[indexOfSchema];
- sensor[rowIndex] = value != null && (boolean) value;
- break;
+ case TEXT: {
+ Binary[] sensor = (Binary[]) values[indexOfSchema];
+ if (value instanceof Binary) {
+ sensor[rowIndex] = (Binary) value;
+ } else {
+ sensor[rowIndex] =
+ value != null
+ ? new Binary((String) value, TSFileConfig.STRING_CHARSET)
+ : Binary.EMPTY_VALUE;
}
+ break;
+ }
+ case FLOAT: {
+ float[] sensor = (float[]) values[indexOfSchema];
+ sensor[rowIndex] = value != null ? (float) value : Float.MIN_VALUE;
+ break;
+ }
+ case INT32: {
+ int[] sensor = (int[]) values[indexOfSchema];
+ sensor[rowIndex] = value != null ? (int) value : Integer.MIN_VALUE;
+ break;
+ }
+ case INT64: {
+ long[] sensor = (long[]) values[indexOfSchema];
+ sensor[rowIndex] = value != null ? (long) value : Long.MIN_VALUE;
+ break;
+ }
+ case DOUBLE: {
+ double[] sensor = (double[]) values[indexOfSchema];
+ sensor[rowIndex] = value != null ? (double) value : Double.MIN_VALUE;
+ break;
+ }
+ case BOOLEAN: {
+ boolean[] sensor = (boolean[]) values[indexOfSchema];
+ sensor[rowIndex] = value != null && (boolean) value;
+ break;
+ }
default:
throw new
UnSupportedDataTypeException(String.format(NOT_SUPPORT_DATATYPE, dataType));
}
@@ -238,12 +273,16 @@ public class Tablet {
return schemas;
}
- /** Return the maximum number of rows for this tablet */
+ /**
+ * Return the maximum number of rows for this tablet
+ */
public int getMaxRowNumber() {
return maxRowNumber;
}
- /** Reset Tablet to the default state - set the rowSize to 0 and reset
bitMaps */
+ /**
+ * Reset Tablet to the default state - set the rowSize to 0 and reset bitMaps
+ */
public void reset() {
rowSize = 0;
if (bitMaps != null) {
@@ -304,7 +343,9 @@ public class Tablet {
return rowSize * 8;
}
- /** @return Total bytes of values */
+ /**
+ * @return Total bytes of values
+ */
public int getTotalValueOccupation() {
int valueOccupation = 0;
int columnIndex = 0;
@@ -352,7 +393,9 @@ public class Tablet {
return valueOccupation;
}
- /** Serialize {@link Tablet} */
+ /**
+ * Serialize {@link Tablet}
+ */
public ByteBuffer serialize() throws IOException {
try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
@@ -370,7 +413,9 @@ public class Tablet {
writeValues(stream);
}
- /** Serialize {@link MeasurementSchema}s */
+ /**
+ * Serialize {@link MeasurementSchema}s
+ */
private void writeMeasurementSchemas(DataOutputStream stream) throws
IOException {
ReadWriteIOUtils.write(BytesUtils.boolToByte(schemas != null), stream);
if (schemas != null) {
@@ -395,7 +440,9 @@ public class Tablet {
}
}
- /** Serialize {@link BitMap}s */
+ /**
+ * Serialize {@link BitMap}s
+ */
private void writeBitMaps(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(BytesUtils.boolToByte(bitMaps != null), stream);
if (bitMaps != null) {
@@ -412,7 +459,9 @@ public class Tablet {
}
}
- /** Serialize values */
+ /**
+ * Serialize values
+ */
private void writeValues(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(BytesUtils.boolToByte(values != null), stream);
if (values != null) {
@@ -475,7 +524,9 @@ public class Tablet {
}
}
- /** Deserialize Tablet */
+ /**
+ * Deserialize Tablet
+ */
public static Tablet deserialize(ByteBuffer byteBuffer) {
String deviceId = ReadWriteIOUtils.readString(byteBuffer);
int rowSize = ReadWriteIOUtils.readInt(byteBuffer);
@@ -524,7 +575,9 @@ public class Tablet {
return tablet;
}
- /** deserialize bitmaps */
+ /**
+ * deserialize bitmaps
+ */
public static BitMap[] readBitMapsFromBuffer(ByteBuffer byteBuffer, int
columns) {
BitMap[] bitMaps = new BitMap[columns];
for (int i = 0; i < columns; i++) {
@@ -609,10 +662,11 @@ public class Tablet {
}
/**
- * Note that the function will judge 2 {@link Tablet}s to be equal when
their contents are logically the
- * same. Namely, a {@link Tablet} with {@link BitMap} "null" may be equal to
another {@link Tablet} with 3 columns and
- * {@link BitMap "[null, null, null]", and a {@link Tablet} with rowSize 2
is judged identical to other {@link Tablet}s
- * regardless of any timeStamps with indexes larger than or equal to 2.
+ * Note that the function will judge 2 {@link Tablet}s to be equal when
their contents are
+ * logically the same. Namely, a {@link Tablet} with {@link BitMap} "null"
may be equal to another
+ * {@link Tablet} with 3 columns and
+ * {@link BitMap "[null, null, null]", and a {@link Tablet} with rowSize 2
is judged identical to
+ * other {@link Tablet}s regardless of any timeStamps with indexes larger
than or equal to 2.
*
* @param o the tablet to compare
* @return {@code true} if the tablets are logically equal
@@ -791,4 +845,28 @@ public class Tablet {
}
return true;
}
+
+ public void setColumnTypes(List<ColumnType> columnTypes) {
+ this.columnTypes = columnTypes;
+ idColumnRange = 0;
+ for (int i = 0; i < columnTypes.size(); i++) {
+ if (columnTypes.get(i).equals(ColumnType.MEASUREMENT)) {
+ break;
+ }
+ idColumnRange ++;
+ }
+ }
+
+ public enum ColumnType {
+ ID,
+ MEASUREMENT;
+
+ public static List<ColumnType> nCopy(ColumnType type, int n) {
+ List<ColumnType> result = new ArrayList<>(n);
+ for (int i = 0; i < n; i++) {
+ result.add(type);
+ }
+ return result;
+ }
+ }
}