This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch tsFile_v4
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/tsFile_v4 by this push:
new 662c7054 Add writeTablet for table-view.
662c7054 is described below
commit 662c70548acfde2a12de5b55e11e49b77d5af1e9
Author: jt2594838 <[email protected]>
AuthorDate: Sun Apr 7 18:41:48 2024 +0800
Add writeTablet for table-view.
---
...ArrayDeviceID.java => StringArrayDeviceID.java} | 14 ++--
.../java/org/apache/tsfile/utils/WriteUtils.java | 61 ++++++++++++++
.../java/org/apache/tsfile/write/TsFileWriter.java | 23 ++++--
.../write/chunk/AlignedChunkGroupWriterImpl.java | 17 +++-
.../tsfile/write/chunk/IChunkGroupWriter.java | 11 ++-
.../chunk/NonAlignedChunkGroupWriterImpl.java | 17 +++-
.../org/apache/tsfile/write/record/Tablet.java | 95 +++++++++++++++++-----
7 files changed, 195 insertions(+), 43 deletions(-)
diff --git
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/ArrayDeviceID.java
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java
similarity index 86%
rename from
tsfile/src/main/java/org/apache/tsfile/file/metadata/ArrayDeviceID.java
rename to
tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java
index 4685df9c..de71419c 100644
--- a/tsfile/src/main/java/org/apache/tsfile/file/metadata/ArrayDeviceID.java
+++
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java
@@ -24,18 +24,22 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.util.Objects;
import org.apache.tsfile.exception.TsFileRuntimeException;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.utils.WriteUtils;
-public class ArrayDeviceID implements IDeviceID {
+public class StringArrayDeviceID implements IDeviceID {
private static final long INSTANCE_SIZE =
- RamUsageEstimator.shallowSizeOfInstance(ArrayDeviceID.class);
+ RamUsageEstimator.shallowSizeOfInstance(StringArrayDeviceID.class);
- private String[] segments;
+ //TODO: change to Object[] and rename to just ArrayDeviceID
+ // or we can just use a tuple like Relational DB.
+ private final String[] segments;
- public ArrayDeviceID(String[] segments) {
+ public StringArrayDeviceID(String[] segments) {
this.segments = segments;
}
@@ -101,7 +105,7 @@ public class ArrayDeviceID implements IDeviceID {
// the other ID is a prefix of this one
return 1;
}
- final int comp = this.segment(i).compareTo(o.segment(i));
+ final int comp = Objects.compare(this.segment(i), o.segment(i),
WriteUtils::compareStrings);
if (comp != 0) {
// the partial comparison has a result
return comp;
diff --git a/tsfile/src/main/java/org/apache/tsfile/utils/WriteUtils.java
b/tsfile/src/main/java/org/apache/tsfile/utils/WriteUtils.java
new file mode 100644
index 00000000..1cd92b43
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/tsfile/utils/WriteUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tsfile.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.write.record.Tablet;
+
+public class WriteUtils {
+
+ /**
+ * A Tablet for the table-view insert interface may contain more than one
device. This method
+ * splits a Tablet by different deviceIds so that the caller can insert them
device-by-device.
+ * @return (deviceId, endRowNum) pairs
+ */
+ public static List<Pair<IDeviceID, Integer>> splitTabletByDevice(Tablet
tablet) {
+ List<Pair<IDeviceID, Integer>> result = new ArrayList<>();
+ IDeviceID lastDeviceID = null;
+ for (int i = 0; i < tablet.rowSize; i++) {
+ final IDeviceID currDeviceID = tablet.getDeviceID(i);
+ if (!currDeviceID.equals(lastDeviceID)) {
+ if (lastDeviceID != null) {
+ result.add(new Pair<>(lastDeviceID, i));
+ }
+ lastDeviceID = currDeviceID;
+ }
+ }
+ result.add(new Pair<>(lastDeviceID, tablet.rowSize));
+ return result;
+ }
+
+ public static int compareStrings(String a, String b) {
+ if (a == null && b == null) {
+ return 0;
+ }
+ if (a == null) {
+ return -1;
+ }
+ if (b == null) {
+ return 1;
+ }
+ return a.compareTo(b);
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
b/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
index 315dd1ae..df5d9dbd 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
@@ -29,6 +29,8 @@ import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.utils.MeasurementGroup;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.WriteUtils;
import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl;
import org.apache.tsfile.write.chunk.IChunkGroupWriter;
import org.apache.tsfile.write.chunk.NonAlignedChunkGroupWriterImpl;
@@ -354,7 +356,7 @@ public class TsFileWriter implements AutoCloseable {
}
private void checkIsTableExist(Tablet tablet) throws WriteProcessException {
- String tableName = tablet.deviceId;
+ String tableName = tablet.insertTargetName;
final TableSchema tableSchema =
getSchema().getTableSchemaMap().get(tableName);
if (tableSchema == null) {
throw new NoTableException(tableName);
@@ -375,9 +377,9 @@ public class TsFileWriter implements AutoCloseable {
private void checkIsTimeseriesExist(Tablet tablet, boolean isAligned)
throws WriteProcessException, IOException {
IChunkGroupWriter groupWriter =
- tryToInitialGroupWriter(new PlainDeviceID(tablet.deviceId), isAligned);
+ tryToInitialGroupWriter(new PlainDeviceID(tablet.insertTargetName),
isAligned);
- Path devicePath = new Path(tablet.deviceId);
+ Path devicePath = new Path(tablet.insertTargetName);
List<MeasurementSchema> schemas = tablet.getSchemas();
if (getSchema().containsDevice(devicePath)) {
checkIsAllMeasurementsInGroup(getSchema().getSeriesSchema(devicePath),
schemas, isAligned);
@@ -543,7 +545,7 @@ public class TsFileWriter implements AutoCloseable {
// make sure the ChunkGroupWriter for this Tablet exist
checkIsTimeseriesExist(tablet, false);
// get corresponding ChunkGroupWriter and write this Tablet
- recordCount += groupWriters.get(new
PlainDeviceID(tablet.deviceId)).write(tablet);
+ recordCount += groupWriters.get(new
PlainDeviceID(tablet.insertTargetName)).write(tablet);
return checkMemorySizeAndMayFlushChunks();
}
@@ -551,7 +553,7 @@ public class TsFileWriter implements AutoCloseable {
// make sure the ChunkGroupWriter for this Tablet exist
checkIsTimeseriesExist(tablet, true);
// get corresponding ChunkGroupWriter and write this Tablet
- recordCount += groupWriters.get(new
PlainDeviceID(tablet.deviceId)).write(tablet);
+ recordCount += groupWriters.get(new
PlainDeviceID(tablet.insertTargetName)).write(tablet);
return checkMemorySizeAndMayFlushChunks();
}
@@ -678,8 +680,15 @@ public class TsFileWriter implements AutoCloseable {
public boolean writeTable(Tablet tablet) throws IOException,
WriteProcessException {
// make sure the ChunkGroupWriter for this Tablet exist
checkIsTableExist(tablet);
- // get corresponding ChunkGroupWriter and write this Tablet
- recordCount += groupWriters.get(new
PlainDeviceID(tablet.deviceId)).write(tablet);
+ // spilt the tablet by deviceId
+ final List<Pair<IDeviceID, Integer>> deviceIdEndIndexPairs =
WriteUtils.splitTabletByDevice(tablet);
+ int startIndex = 0;
+ for (Pair<IDeviceID, Integer> pair : deviceIdEndIndexPairs) {
+ // get corresponding ChunkGroupWriter and write this Tablet
+ recordCount += groupWriters.get(pair.left).write(tablet, startIndex,
pair.right,
+ tablet.getIdColumnRange(), tablet.getSchemas().size());
+ startIndex = pair.right;
+ }
return checkMemorySizeAndMayFlushChunks();
}
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
b/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
index 1d596881..f23dddbd 100644
---
a/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
+++
b/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
@@ -151,7 +151,18 @@ public class AlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
}
@Override
- public int write(Tablet tablet) throws WriteProcessException, IOException {
+ public int write(Tablet tablet) throws IOException, WriteProcessException {
+ return write(tablet, 0, tablet.rowSize, 0, tablet.getSchemas().size());
+ }
+
+ public int write(Tablet tablet, int startRowIndex, int endRowIndex) throws
IOException, WriteProcessException {
+ return write(tablet, startRowIndex, endRowIndex, 0,
tablet.getSchemas().size());
+ }
+
+ @Override
+ public int write(Tablet tablet, int startRowIndex, int endRowIndex, int
startColIndex,
+ int endColIndex) throws WriteProcessException,
+ IOException {
int pointCount = 0;
List<MeasurementSchema> measurementSchemas = tablet.getSchemas();
List<ValueChunkWriter> emptyValueChunkWriters = new ArrayList<>();
@@ -164,10 +175,10 @@ public class AlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
emptyValueChunkWriters.add(entry.getValue());
}
}
- for (int row = 0; row < tablet.rowSize; row++) {
+ for (int row = startRowIndex; row < endRowIndex; row++) {
long time = tablet.timestamps[row];
checkIsHistoryData(time);
- for (int columnIndex = 0; columnIndex < measurementSchemas.size();
columnIndex++) {
+ for (int columnIndex = startColIndex; columnIndex < endColIndex;
columnIndex++) {
boolean isNull =
tablet.bitMaps != null
&& tablet.bitMaps[columnIndex] != null
diff --git
a/tsfile/src/main/java/org/apache/tsfile/write/chunk/IChunkGroupWriter.java
b/tsfile/src/main/java/org/apache/tsfile/write/chunk/IChunkGroupWriter.java
index c9ca7a92..d20997a7 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/chunk/IChunkGroupWriter.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/chunk/IChunkGroupWriter.java
@@ -52,13 +52,20 @@ public interface IChunkGroupWriter {
*/
int write(Tablet tablet) throws WriteProcessException, IOException;
+ int write(Tablet table, int startRowIndex, int endRowIndex)
+ throws WriteProcessException, IOException;
+
+ int write(Tablet table, int startRowIndex, int endRowIndex, int
startColIndex, int endColIndex)
+ throws WriteProcessException, IOException;
+
+
/**
* flushing method for serializing to local file system or HDFS. Implemented
by
* ChunkWriterImpl.writeToFileWriter().
*
* @param tsfileWriter - TSFileIOWriter
- * @throws IOException exception in IO
* @return current ChunkGroupDataSize
+ * @throws IOException exception in IO
*/
long flushToFileWriter(TsFileIOWriter tsfileWriter) throws IOException;
@@ -81,8 +88,6 @@ public interface IChunkGroupWriter {
/**
* given a measurement descriptor list, create corresponding writers and put
into this
* ChunkGroupWriter.
- *
- * @param measurementSchemas
*/
void tryToAddSeriesWriter(List<MeasurementSchema> measurementSchemas) throws
IOException;
diff --git
a/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
b/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
index 9df9b14d..70cd407e 100644
---
a/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
+++
b/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
@@ -90,14 +90,25 @@ public class NonAlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
}
@Override
- public int write(Tablet tablet) throws WriteProcessException {
+ public int write(Tablet tablet) throws IOException, WriteProcessException {
+ return write(tablet, 0, tablet.rowSize, 0, tablet.getSchemas().size());
+ }
+
+ public int write(Tablet tablet, int startRowIndex, int endRowIndex) throws
IOException, WriteProcessException {
+ return write(tablet, startRowIndex, endRowIndex, 0,
tablet.getSchemas().size());
+ }
+
+ @Override
+ public int write(Tablet tablet, int startRowIndex, int endRowIndex, int
startColIndex,
+ int endColIndex) throws WriteProcessException,
+ IOException {
int maxPointCount = 0, pointCount;
List<MeasurementSchema> timeseries = tablet.getSchemas();
- for (int column = 0; column < timeseries.size(); column++) {
+ for (int column = startColIndex; column < endColIndex; column++) {
String measurementId = timeseries.get(column).getMeasurementId();
TSDataType tsDataType = timeseries.get(column).getType();
pointCount = 0;
- for (int row = 0; row < tablet.rowSize; row++) {
+ for (int row = startRowIndex; row < endRowIndex; row++) {
// check isNull in tablet
if (tablet.bitMaps != null
&& tablet.bitMaps[column] != null
diff --git a/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java
b/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java
index 1ef2ac6c..879e7be6 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java
@@ -21,6 +21,8 @@ package org.apache.tsfile.write.record;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.BytesUtils;
@@ -54,17 +56,17 @@ public class Tablet {
private static final String NOT_SUPPORT_DATATYPE = "Data type %s is not
supported.";
/**
- * DeviceId of this {@link Tablet}
+ * DeviceId if using tree-view interfaces or TableName when using table-view
interfaces.
*/
- public String deviceId;
+ public String insertTargetName;
/**
* The list of {@link MeasurementSchema}s for creating the {@link Tablet}
*/
private List<MeasurementSchema> schemas;
/**
- * Marking the type of each column, namely ID or MEASUREMENT.
- * Notice: the ID columns must be the FIRST ones.
+ * Marking the type of each column, namely ID or MEASUREMENT. Notice: the ID
columns must be the
+ * FIRST ones.
*/
private List<ColumnType> columnTypes;
@@ -103,12 +105,12 @@ public class Tablet {
* Return a {@link Tablet} with default specified row number. This is the
standard constructor
* (all Tablet should be the same size).
*
- * @param deviceId the name of the device specified to be written in
+ * @param insertTargetName the name of the device specified to be written in
* @param schemas the list of {@link MeasurementSchema}s for creating the
tablet, only
* measurementId and type take effects
*/
- public Tablet(String deviceId, List<MeasurementSchema> schemas) {
- this(deviceId, schemas, DEFAULT_SIZE);
+ public Tablet(String insertTargetName, List<MeasurementSchema> schemas) {
+ this(insertTargetName, schemas, DEFAULT_SIZE);
}
/**
@@ -116,13 +118,13 @@ public class Tablet {
* constructor directly for testing purposes. {@link Tablet} should normally
always be default
* size.
*
- * @param deviceId the name of the device specified to be written in
+ * @param insertTargetName the name of the device specified to be written in
* @param schemas the list of {@link MeasurementSchema}s for creating the
row batch, only
* measurementId and type take effects
* @param maxRowNumber the maximum number of rows for this tablet
*/
- public Tablet(String deviceId, List<MeasurementSchema> schemas, int
maxRowNumber) {
- this.deviceId = deviceId;
+ public Tablet(String insertTargetName, List<MeasurementSchema> schemas, int
maxRowNumber) {
+ this.insertTargetName = insertTargetName;
this.schemas = new ArrayList<>(schemas);
setColumnTypes(ColumnType.nCopy(ColumnType.MEASUREMENT, schemas.size()));
this.maxRowNumber = maxRowNumber;
@@ -138,7 +140,7 @@ public class Tablet {
* Return a {@link Tablet} with specified timestamps and values. Only call
this constructor
* directly for Trigger.
*
- * @param deviceId the name of the device specified to be written in
+ * @param insertTargetName the name of the device specified to be written in
* @param schemas the list of {@link MeasurementSchema}s for creating the
row batch, only
* measurementId and type take effects
* @param timestamps given timestamps
@@ -147,25 +149,26 @@ public class Tablet {
* @param maxRowNumber the maximum number of rows for this {@link Tablet}
*/
public Tablet(
- String deviceId,
+ String insertTargetName,
List<MeasurementSchema> schemas,
long[] timestamps,
Object[] values,
BitMap[] bitMaps,
int maxRowNumber) {
- this(deviceId, schemas, ColumnType.nCopy(ColumnType.MEASUREMENT,
schemas.size()), timestamps,
+ this(insertTargetName, schemas, ColumnType.nCopy(ColumnType.MEASUREMENT,
schemas.size()),
+ timestamps,
values, bitMaps, maxRowNumber);
}
public Tablet(
- String deviceId,
+ String insertTargetName,
List<MeasurementSchema> schemas,
List<ColumnType> columnTypes,
long[] timestamps,
Object[] values,
BitMap[] bitMaps,
int maxRowNumber) {
- this.deviceId = deviceId;
+ this.insertTargetName = insertTargetName;
this.schemas = schemas;
setColumnTypes(columnTypes);
this.timestamps = timestamps;
@@ -187,8 +190,8 @@ public class Tablet {
}
}
- public void setDeviceId(String deviceId) {
- this.deviceId = deviceId;
+ public void setInsertTargetName(String insertTargetName) {
+ this.insertTargetName = insertTargetName;
}
public void setSchemas(List<MeasurementSchema> schemas) {
@@ -405,7 +408,7 @@ public class Tablet {
}
public void serialize(DataOutputStream stream) throws IOException {
- ReadWriteIOUtils.write(deviceId, stream);
+ ReadWriteIOUtils.write(insertTargetName, stream);
ReadWriteIOUtils.write(rowSize, stream);
writeMeasurementSchemas(stream);
writeTimes(stream);
@@ -683,7 +686,7 @@ public class Tablet {
boolean flag =
that.rowSize == rowSize
- && Objects.equals(that.deviceId, deviceId)
+ && Objects.equals(that.insertTargetName, insertTargetName)
&& Objects.equals(that.schemas, schemas)
&& Objects.equals(that.measurementIndex, measurementIndex);
if (!flag) {
@@ -846,14 +849,62 @@ public class Tablet {
return true;
}
+ public boolean isNull(int i, int j) {
+ return bitMaps != null && bitMaps[j] != null && !bitMaps[j].isMarked(i);
+ }
+ /**
+ *
+ * @param i row number
+ * @param j column number
+ * @return the string format of the i-th value in the j-th column.
+ */
+ public Object getValue(int i, int j) {
+ if (isNull(i, j)) {
+ return null;
+ }
+ switch (schemas.get(j).getType()) {
+ case TEXT:
+ return ((Binary[]) values[j])[i];
+ case INT32:
+ return ((int[]) values[j])[i];
+ case FLOAT:
+ return ((float[]) values[j])[i];
+ case DOUBLE:
+ return ((double[]) values[j])[i];
+ case BOOLEAN:
+ return ((boolean[]) values[j])[i];
+ case INT64:
+ return ((long[]) values[j])[i];
+ default:
+ throw new IllegalArgumentException("Unsupported type: " +
schemas.get(j).getType());
+ }
+ }
+
+ /**
+ * @param i a row number.
+ * @return the IDeviceID of the i-th row.
+ */
+ public IDeviceID getDeviceID(int i) {
+ String[] idArray = new String[idColumnRange];
+ for (int j = 0; j < idColumnRange; j++) {
+ final Object value = getValue(i, j);
+ idArray[j] = value != null ? value.toString() : null;
+ }
+ return new StringArrayDeviceID(idArray);
+ }
+
+ public int getIdColumnRange() {
+ return idColumnRange;
+ }
+
public void setColumnTypes(List<ColumnType> columnTypes) {
this.columnTypes = columnTypes;
idColumnRange = 0;
- for (int i = 0; i < columnTypes.size(); i++) {
- if (columnTypes.get(i).equals(ColumnType.MEASUREMENT)) {
+ for (ColumnType columnType : columnTypes) {
+ if (columnType.equals(ColumnType.MEASUREMENT)) {
break;
}
- idColumnRange ++;
+ idColumnRange++;
}
}