This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch tsFile_v4
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/tsFile_v4 by this push:
     new 662c7054 Add writeTablet for table-view.
662c7054 is described below

commit 662c70548acfde2a12de5b55e11e49b77d5af1e9
Author: jt2594838 <[email protected]>
AuthorDate: Sun Apr 7 18:41:48 2024 +0800

    Add writeTablet for table-view.
---
 ...ArrayDeviceID.java => StringArrayDeviceID.java} | 14 ++--
 .../java/org/apache/tsfile/utils/WriteUtils.java   | 61 ++++++++++++++
 .../java/org/apache/tsfile/write/TsFileWriter.java | 23 ++++--
 .../write/chunk/AlignedChunkGroupWriterImpl.java   | 17 +++-
 .../tsfile/write/chunk/IChunkGroupWriter.java      | 11 ++-
 .../chunk/NonAlignedChunkGroupWriterImpl.java      | 17 +++-
 .../org/apache/tsfile/write/record/Tablet.java     | 95 +++++++++++++++++-----
 7 files changed, 195 insertions(+), 43 deletions(-)

diff --git 
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/ArrayDeviceID.java 
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java
similarity index 86%
rename from 
tsfile/src/main/java/org/apache/tsfile/file/metadata/ArrayDeviceID.java
rename to 
tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java
index 4685df9c..de71419c 100644
--- a/tsfile/src/main/java/org/apache/tsfile/file/metadata/ArrayDeviceID.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java
@@ -24,18 +24,22 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.Objects;
 import org.apache.tsfile.exception.TsFileRuntimeException;
 import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.utils.WriteUtils;
 
-public class ArrayDeviceID implements IDeviceID {
+public class StringArrayDeviceID implements IDeviceID {
 
   private static final long INSTANCE_SIZE =
-      RamUsageEstimator.shallowSizeOfInstance(ArrayDeviceID.class);
+      RamUsageEstimator.shallowSizeOfInstance(StringArrayDeviceID.class);
 
-  private String[] segments;
+  //TODO: change to Object[] and rename to just ArrayDeviceID
+  // or we can just use a tuple like Relational DB.
+  private final String[] segments;
 
-  public ArrayDeviceID(String[] segments) {
+  public StringArrayDeviceID(String[] segments) {
     this.segments = segments;
   }
 
@@ -101,7 +105,7 @@ public class ArrayDeviceID implements IDeviceID {
         // the other ID is a prefix of this one
         return 1;
       }
-      final int comp = this.segment(i).compareTo(o.segment(i));
+      final int comp = Objects.compare(this.segment(i), o.segment(i), 
WriteUtils::compareStrings);
       if (comp != 0) {
         // the partial comparison has a result
         return comp;
diff --git a/tsfile/src/main/java/org/apache/tsfile/utils/WriteUtils.java 
b/tsfile/src/main/java/org/apache/tsfile/utils/WriteUtils.java
new file mode 100644
index 00000000..1cd92b43
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/tsfile/utils/WriteUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tsfile.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.write.record.Tablet;
+
+public class WriteUtils {
+
+  /**
+   * A Tablet for the table-view insert interface may contain more than one 
device. This method
+   * splits a Tablet by different deviceIds so that the caller can insert them 
device-by-device.
+   * @return (deviceId, endRowNum) pairs
+   */
+  public static List<Pair<IDeviceID, Integer>> splitTabletByDevice(Tablet 
tablet) {
+    List<Pair<IDeviceID, Integer>> result = new ArrayList<>();
+    IDeviceID lastDeviceID = null;
+    for (int i = 0; i < tablet.rowSize; i++) {
+      final IDeviceID currDeviceID = tablet.getDeviceID(i);
+      if (!currDeviceID.equals(lastDeviceID)) {
+        if (lastDeviceID != null) {
+          result.add(new Pair<>(lastDeviceID, i));
+        }
+        lastDeviceID = currDeviceID;
+      }
+    }
+    result.add(new Pair<>(lastDeviceID, tablet.rowSize));
+    return result;
+  }
+
+  public static int compareStrings(String a, String b) {
+    if (a == null && b == null) {
+      return 0;
+    }
+    if (a == null) {
+      return -1;
+    }
+    if (b == null) {
+      return 1;
+    }
+    return a.compareTo(b);
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java 
b/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
index 315dd1ae..df5d9dbd 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
@@ -29,6 +29,8 @@ import org.apache.tsfile.file.metadata.PlainDeviceID;
 import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.read.common.Path;
 import org.apache.tsfile.utils.MeasurementGroup;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.WriteUtils;
 import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl;
 import org.apache.tsfile.write.chunk.IChunkGroupWriter;
 import org.apache.tsfile.write.chunk.NonAlignedChunkGroupWriterImpl;
@@ -354,7 +356,7 @@ public class TsFileWriter implements AutoCloseable {
   }
 
   private void checkIsTableExist(Tablet tablet) throws WriteProcessException {
-    String tableName = tablet.deviceId;
+    String tableName = tablet.insertTargetName;
     final TableSchema tableSchema = 
getSchema().getTableSchemaMap().get(tableName);
     if (tableSchema == null) {
       throw new NoTableException(tableName);
@@ -375,9 +377,9 @@ public class TsFileWriter implements AutoCloseable {
   private void checkIsTimeseriesExist(Tablet tablet, boolean isAligned)
       throws WriteProcessException, IOException {
     IChunkGroupWriter groupWriter =
-        tryToInitialGroupWriter(new PlainDeviceID(tablet.deviceId), isAligned);
+        tryToInitialGroupWriter(new PlainDeviceID(tablet.insertTargetName), 
isAligned);
 
-    Path devicePath = new Path(tablet.deviceId);
+    Path devicePath = new Path(tablet.insertTargetName);
     List<MeasurementSchema> schemas = tablet.getSchemas();
     if (getSchema().containsDevice(devicePath)) {
       checkIsAllMeasurementsInGroup(getSchema().getSeriesSchema(devicePath), 
schemas, isAligned);
@@ -543,7 +545,7 @@ public class TsFileWriter implements AutoCloseable {
     // make sure the ChunkGroupWriter for this Tablet exist
     checkIsTimeseriesExist(tablet, false);
     // get corresponding ChunkGroupWriter and write this Tablet
-    recordCount += groupWriters.get(new 
PlainDeviceID(tablet.deviceId)).write(tablet);
+    recordCount += groupWriters.get(new 
PlainDeviceID(tablet.insertTargetName)).write(tablet);
     return checkMemorySizeAndMayFlushChunks();
   }
 
@@ -551,7 +553,7 @@ public class TsFileWriter implements AutoCloseable {
     // make sure the ChunkGroupWriter for this Tablet exist
     checkIsTimeseriesExist(tablet, true);
     // get corresponding ChunkGroupWriter and write this Tablet
-    recordCount += groupWriters.get(new 
PlainDeviceID(tablet.deviceId)).write(tablet);
+    recordCount += groupWriters.get(new 
PlainDeviceID(tablet.insertTargetName)).write(tablet);
     return checkMemorySizeAndMayFlushChunks();
   }
 
@@ -678,8 +680,15 @@ public class TsFileWriter implements AutoCloseable {
   public boolean writeTable(Tablet tablet) throws IOException, 
WriteProcessException {
     // make sure the ChunkGroupWriter for this Tablet exist
     checkIsTableExist(tablet);
-    // get corresponding ChunkGroupWriter and write this Tablet
-    recordCount += groupWriters.get(new 
PlainDeviceID(tablet.deviceId)).write(tablet);
+    // spilt the tablet by deviceId
+    final List<Pair<IDeviceID, Integer>> deviceIdEndIndexPairs = 
WriteUtils.splitTabletByDevice(tablet);
+    int startIndex = 0;
+    for (Pair<IDeviceID, Integer> pair : deviceIdEndIndexPairs) {
+      // get corresponding ChunkGroupWriter and write this Tablet
+      recordCount += groupWriters.get(pair.left).write(tablet, startIndex, 
pair.right,
+          tablet.getIdColumnRange(), tablet.getSchemas().size());
+      startIndex = pair.right;
+    }
     return checkMemorySizeAndMayFlushChunks();
   }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
 
b/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
index 1d596881..f23dddbd 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
@@ -151,7 +151,18 @@ public class AlignedChunkGroupWriterImpl implements 
IChunkGroupWriter {
   }
 
   @Override
-  public int write(Tablet tablet) throws WriteProcessException, IOException {
+  public int write(Tablet tablet) throws IOException, WriteProcessException {
+    return write(tablet, 0, tablet.rowSize, 0, tablet.getSchemas().size());
+  }
+
+  public int write(Tablet tablet, int startRowIndex, int endRowIndex) throws 
IOException, WriteProcessException {
+    return write(tablet, startRowIndex, endRowIndex, 0, 
tablet.getSchemas().size());
+  }
+
+  @Override
+  public int write(Tablet tablet, int startRowIndex, int endRowIndex, int 
startColIndex,
+      int endColIndex) throws WriteProcessException,
+      IOException {
     int pointCount = 0;
     List<MeasurementSchema> measurementSchemas = tablet.getSchemas();
     List<ValueChunkWriter> emptyValueChunkWriters = new ArrayList<>();
@@ -164,10 +175,10 @@ public class AlignedChunkGroupWriterImpl implements 
IChunkGroupWriter {
         emptyValueChunkWriters.add(entry.getValue());
       }
     }
-    for (int row = 0; row < tablet.rowSize; row++) {
+    for (int row = startRowIndex; row < endRowIndex; row++) {
       long time = tablet.timestamps[row];
       checkIsHistoryData(time);
-      for (int columnIndex = 0; columnIndex < measurementSchemas.size(); 
columnIndex++) {
+      for (int columnIndex = startColIndex; columnIndex < endColIndex; 
columnIndex++) {
         boolean isNull =
             tablet.bitMaps != null
                 && tablet.bitMaps[columnIndex] != null
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/write/chunk/IChunkGroupWriter.java 
b/tsfile/src/main/java/org/apache/tsfile/write/chunk/IChunkGroupWriter.java
index c9ca7a92..d20997a7 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/chunk/IChunkGroupWriter.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/chunk/IChunkGroupWriter.java
@@ -52,13 +52,20 @@ public interface IChunkGroupWriter {
    */
   int write(Tablet tablet) throws WriteProcessException, IOException;
 
+  int write(Tablet table, int startRowIndex, int endRowIndex)
+      throws WriteProcessException, IOException;
+
+  int write(Tablet table, int startRowIndex, int endRowIndex, int 
startColIndex, int endColIndex)
+      throws WriteProcessException, IOException;
+
+
   /**
    * flushing method for serializing to local file system or HDFS. Implemented 
by
    * ChunkWriterImpl.writeToFileWriter().
    *
    * @param tsfileWriter - TSFileIOWriter
-   * @throws IOException exception in IO
    * @return current ChunkGroupDataSize
+   * @throws IOException exception in IO
    */
   long flushToFileWriter(TsFileIOWriter tsfileWriter) throws IOException;
 
@@ -81,8 +88,6 @@ public interface IChunkGroupWriter {
   /**
    * given a measurement descriptor list, create corresponding writers and put 
into this
    * ChunkGroupWriter.
-   *
-   * @param measurementSchemas
    */
   void tryToAddSeriesWriter(List<MeasurementSchema> measurementSchemas) throws 
IOException;
 
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
 
b/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
index 9df9b14d..70cd407e 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
@@ -90,14 +90,25 @@ public class NonAlignedChunkGroupWriterImpl implements 
IChunkGroupWriter {
   }
 
   @Override
-  public int write(Tablet tablet) throws WriteProcessException {
+  public int write(Tablet tablet) throws IOException, WriteProcessException {
+    return write(tablet, 0, tablet.rowSize, 0, tablet.getSchemas().size());
+  }
+
+  public int write(Tablet tablet, int startRowIndex, int endRowIndex) throws 
IOException, WriteProcessException {
+    return write(tablet, startRowIndex, endRowIndex, 0, 
tablet.getSchemas().size());
+  }
+
+  @Override
+  public int write(Tablet tablet, int startRowIndex, int endRowIndex, int 
startColIndex,
+      int endColIndex) throws WriteProcessException,
+      IOException {
     int maxPointCount = 0, pointCount;
     List<MeasurementSchema> timeseries = tablet.getSchemas();
-    for (int column = 0; column < timeseries.size(); column++) {
+    for (int column = startColIndex; column < endColIndex; column++) {
       String measurementId = timeseries.get(column).getMeasurementId();
       TSDataType tsDataType = timeseries.get(column).getType();
       pointCount = 0;
-      for (int row = 0; row < tablet.rowSize; row++) {
+      for (int row = startRowIndex; row < endRowIndex; row++) {
         // check isNull in tablet
         if (tablet.bitMaps != null
             && tablet.bitMaps[column] != null
diff --git a/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java 
b/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java
index 1ef2ac6c..879e7be6 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java
@@ -21,6 +21,8 @@ package org.apache.tsfile.write.record;
 
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.utils.BytesUtils;
@@ -54,17 +56,17 @@ public class Tablet {
   private static final String NOT_SUPPORT_DATATYPE = "Data type %s is not 
supported.";
 
   /**
-   * DeviceId of this {@link Tablet}
+   * DeviceId if using tree-view interfaces or TableName when using table-view 
interfaces.
    */
-  public String deviceId;
+  public String insertTargetName;
 
   /**
    * The list of {@link MeasurementSchema}s for creating the {@link Tablet}
    */
   private List<MeasurementSchema> schemas;
   /**
-   * Marking the type of each column, namely ID or MEASUREMENT.
-   * Notice: the ID columns must be the FIRST ones.
+   * Marking the type of each column, namely ID or MEASUREMENT. Notice: the ID 
columns must be the
+   * FIRST ones.
    */
   private List<ColumnType> columnTypes;
 
@@ -103,12 +105,12 @@ public class Tablet {
    * Return a {@link Tablet} with default specified row number. This is the 
standard constructor
    * (all Tablet should be the same size).
    *
-   * @param deviceId the name of the device specified to be written in
+   * @param insertTargetName the name of the device specified to be written in
    * @param schemas the list of {@link MeasurementSchema}s for creating the 
tablet, only
    * measurementId and type take effects
    */
-  public Tablet(String deviceId, List<MeasurementSchema> schemas) {
-    this(deviceId, schemas, DEFAULT_SIZE);
+  public Tablet(String insertTargetName, List<MeasurementSchema> schemas) {
+    this(insertTargetName, schemas, DEFAULT_SIZE);
   }
 
   /**
@@ -116,13 +118,13 @@ public class Tablet {
    * constructor directly for testing purposes. {@link Tablet} should normally 
always be default
    * size.
    *
-   * @param deviceId the name of the device specified to be written in
+   * @param insertTargetName the name of the device specified to be written in
    * @param schemas the list of {@link MeasurementSchema}s for creating the 
row batch, only
    * measurementId and type take effects
    * @param maxRowNumber the maximum number of rows for this tablet
    */
-  public Tablet(String deviceId, List<MeasurementSchema> schemas, int 
maxRowNumber) {
-    this.deviceId = deviceId;
+  public Tablet(String insertTargetName, List<MeasurementSchema> schemas, int 
maxRowNumber) {
+    this.insertTargetName = insertTargetName;
     this.schemas = new ArrayList<>(schemas);
     setColumnTypes(ColumnType.nCopy(ColumnType.MEASUREMENT, schemas.size()));
     this.maxRowNumber = maxRowNumber;
@@ -138,7 +140,7 @@ public class Tablet {
    * Return a {@link Tablet} with specified timestamps and values. Only call 
this constructor
    * directly for Trigger.
    *
-   * @param deviceId the name of the device specified to be written in
+   * @param insertTargetName the name of the device specified to be written in
    * @param schemas the list of {@link MeasurementSchema}s for creating the 
row batch, only
    * measurementId and type take effects
    * @param timestamps given timestamps
@@ -147,25 +149,26 @@ public class Tablet {
    * @param maxRowNumber the maximum number of rows for this {@link Tablet}
    */
   public Tablet(
-      String deviceId,
+      String insertTargetName,
       List<MeasurementSchema> schemas,
       long[] timestamps,
       Object[] values,
       BitMap[] bitMaps,
       int maxRowNumber) {
-    this(deviceId, schemas, ColumnType.nCopy(ColumnType.MEASUREMENT, 
schemas.size()), timestamps,
+    this(insertTargetName, schemas, ColumnType.nCopy(ColumnType.MEASUREMENT, 
schemas.size()),
+        timestamps,
         values, bitMaps, maxRowNumber);
   }
 
   public Tablet(
-      String deviceId,
+      String insertTargetName,
       List<MeasurementSchema> schemas,
       List<ColumnType> columnTypes,
       long[] timestamps,
       Object[] values,
       BitMap[] bitMaps,
       int maxRowNumber) {
-    this.deviceId = deviceId;
+    this.insertTargetName = insertTargetName;
     this.schemas = schemas;
     setColumnTypes(columnTypes);
     this.timestamps = timestamps;
@@ -187,8 +190,8 @@ public class Tablet {
     }
   }
 
-  public void setDeviceId(String deviceId) {
-    this.deviceId = deviceId;
+  public void setInsertTargetName(String insertTargetName) {
+    this.insertTargetName = insertTargetName;
   }
 
   public void setSchemas(List<MeasurementSchema> schemas) {
@@ -405,7 +408,7 @@ public class Tablet {
   }
 
   public void serialize(DataOutputStream stream) throws IOException {
-    ReadWriteIOUtils.write(deviceId, stream);
+    ReadWriteIOUtils.write(insertTargetName, stream);
     ReadWriteIOUtils.write(rowSize, stream);
     writeMeasurementSchemas(stream);
     writeTimes(stream);
@@ -683,7 +686,7 @@ public class Tablet {
 
     boolean flag =
         that.rowSize == rowSize
-            && Objects.equals(that.deviceId, deviceId)
+            && Objects.equals(that.insertTargetName, insertTargetName)
             && Objects.equals(that.schemas, schemas)
             && Objects.equals(that.measurementIndex, measurementIndex);
     if (!flag) {
@@ -846,14 +849,62 @@ public class Tablet {
     return true;
   }
 
+  public boolean isNull(int i, int j) {
+    return bitMaps != null && bitMaps[j] != null && !bitMaps[j].isMarked(i);
+  }
+  /**
+   *
+   * @param i row number
+   * @param j column number
+   * @return the string format of the i-th value in the j-th column.
+   */
+  public Object getValue(int i, int j) {
+    if (isNull(i, j)) {
+      return null;
+    }
+    switch (schemas.get(j).getType()) {
+      case TEXT:
+        return ((Binary[]) values[j])[i];
+      case INT32:
+        return ((int[]) values[j])[i];
+      case FLOAT:
+        return ((float[]) values[j])[i];
+      case DOUBLE:
+        return ((double[]) values[j])[i];
+      case BOOLEAN:
+        return ((boolean[]) values[j])[i];
+      case INT64:
+        return ((long[]) values[j])[i];
+      default:
+        throw new IllegalArgumentException("Unsupported type: " + 
schemas.get(j).getType());
+    }
+  }
+
+  /**
+   * @param i a row number.
+   * @return the IDeviceID of the i-th row.
+   */
+  public IDeviceID getDeviceID(int i) {
+    String[] idArray = new String[idColumnRange];
+    for (int j = 0; j < idColumnRange; j++) {
+      final Object value = getValue(i, j);
+      idArray[j] = value != null ? value.toString() : null;
+    }
+    return new StringArrayDeviceID(idArray);
+  }
+
+  public int getIdColumnRange() {
+    return idColumnRange;
+  }
+
   public void setColumnTypes(List<ColumnType> columnTypes) {
     this.columnTypes = columnTypes;
     idColumnRange = 0;
-    for (int i = 0; i < columnTypes.size(); i++) {
-      if (columnTypes.get(i).equals(ColumnType.MEASUREMENT)) {
+    for (ColumnType columnType : columnTypes) {
+      if (columnType.equals(ColumnType.MEASUREMENT)) {
         break;
       }
-      idColumnRange ++;
+      idColumnRange++;
     }
   }
 

Reply via email to