Copilot commented on code in PR #753:
URL: https://github.com/apache/tsfile/pull/753#discussion_r2998487478


##########
java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TableChunkGroupWriterImpl.java:
##########
@@ -33,4 +39,65 @@ public TableChunkGroupWriterImpl(IDeviceID deviceId, 
EncryptParameter encryptPar
     super(deviceId, encryptParam);
     setConvertColumnNameToLowerCase(true);
   }
+
+  public int write(
+      Column timeColumn,
+      Column[] valueColumns,
+      List<IMeasurementSchema> measurementSchemas,
+      int startRowIndex,
+      int endRowIndex)
+      throws IOException {
+    int pointCount = 0;
+    ValueChunkWriter[] valueChunkWriters = new 
ValueChunkWriter[valueColumns.length];
+    for (int i = 0; i < measurementSchemas.size(); i++) {
+      valueChunkWriters[i] = 
tryToAddSeriesWriterInternal(measurementSchemas.get(i));
+    }
+    for (int rowIndex = startRowIndex; rowIndex < endRowIndex; rowIndex++) {
+      long time = timeColumn.getLong(rowIndex);
+      for (int valueColumnIndex = 0; valueColumnIndex < valueColumns.length; 
valueColumnIndex++) {
+        Column valueColumn = valueColumns[valueColumnIndex];
+        IMeasurementSchema measurementSchema = 
measurementSchemas.get(valueColumnIndex);
+        ValueChunkWriter valueChunkWriter = valueChunkWriters[rowIndex];
+        boolean isNull = valueColumn.isNull(rowIndex);

Review Comment:
   In `write(...)`, the `ValueChunkWriter` is indexed by `rowIndex` 
(`valueChunkWriters[rowIndex]`) instead of by `valueColumnIndex`. This will 
throw `ArrayIndexOutOfBoundsException` as soon as `endRowIndex` exceeds the 
number of columns, and it also writes data into the wrong series. Use the 
value-column index when selecting the writer, and consider validating 
`measurementSchemas.size()` matches `valueColumns.length` before 
initializing/iterating to avoid AIOOB or null writers. Also, this method 
currently skips the out-of-order check (`checkIsHistoryData(time)`) that other 
chunk writers perform, so out-of-order writes may silently corrupt state; it 
should apply the same check (and adjust the throws signature accordingly).



##########
java/tsfile/src/main/java/org/apache/tsfile/write/v4/TableTsBlock2TsFileWriter.java:
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.write.v4;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.column.TimeColumn;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl;
+import org.apache.tsfile.write.chunk.IChunkGroupWriter;
+import org.apache.tsfile.write.chunk.TableChunkGroupWriterImpl;
+import org.apache.tsfile.write.chunk.ValueChunkWriter;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Convert TsBlock (table model) into TsFile format. Core responsibilities: 1. 
Split TsBlock by
+ * device (based on tag columns) 2. Optionally generate a new time column per 
device 3. Dispatch
+ * rows to corresponding ChunkGroupWriter
+ */
+public class TableTsBlock2TsFileWriter extends DeviceTableModelWriter {
+
+  private final boolean generateNewTimeColumn;
+  private final int timeColumnIndexInTsBlock;
+  private final int[] tagColumnIndexInTsBlock;
+  private final int[] fieldColumnIndexInTsBlock;
+  private final IMeasurementSchema[] fieldColumnSchemas;
+
+  private final String tableName;
+  private final Map<IDeviceID, Long> deviceRowCountMap;
+
+  private int rowCount = 0;
+
+  public TableTsBlock2TsFileWriter(
+      File file,
+      TableSchema tableSchema,
+      long memoryThreshold,
+      boolean generateNewTimeColumn,
+      int timeColumnIndexInTsBlock,
+      int[] tagColumnIndexesInTsBlock,
+      int[] fieldColumnIndexesInTsBlock,
+      IMeasurementSchema[] fieldColumnSchemas)
+      throws IOException {
+    super(file, tableSchema, memoryThreshold);
+    this.tableName = tableSchema.getTableName();
+    this.generateNewTimeColumn = generateNewTimeColumn;
+    this.timeColumnIndexInTsBlock = timeColumnIndexInTsBlock;
+    this.tagColumnIndexInTsBlock = tagColumnIndexesInTsBlock;
+    this.fieldColumnIndexInTsBlock = fieldColumnIndexesInTsBlock;
+    this.deviceRowCountMap = generateNewTimeColumn ? new HashMap<>() : null;
+    this.fieldColumnSchemas = fieldColumnSchemas;
+  }
+
+  public void write(TsBlock tsBlock) throws IOException, WriteProcessException 
{
+    if (tsBlock == null || tsBlock.isEmpty()) {
+      return;
+    }
+    // Split TsBlock into device partitions and prepare time column
+    Pair<Column, List<Pair<IDeviceID, Integer>>> 
timeColumnAndDeviceIdEndIndexPairs =
+        splitTsBlockByDeviceAndGetTimeColumn(tsBlock);
+    Column timeColumn = timeColumnAndDeviceIdEndIndexPairs.left;
+    // Extract value columns according to schema mapping
+    Column[] valueColumns = new Column[fieldColumnIndexInTsBlock.length];
+    for (int i = 0; i < valueColumns.length; i++) {
+      valueColumns[i] = tsBlock.getColumn(fieldColumnIndexInTsBlock[i]);
+    }
+    List<Pair<IDeviceID, Integer>> deviceIdEndIndexPairs = 
timeColumnAndDeviceIdEndIndexPairs.right;
+    int startIndex = 0;
+
+    // Iterate each device segment and write data into its ChunkGroup
+    for (Pair<IDeviceID, Integer> pair : deviceIdEndIndexPairs) {
+      TableTsBlockChunkGroupWriterImpl chunkGroupWriter =
+          (TableTsBlockChunkGroupWriterImpl) 
tryToInitialGroupWriter(pair.left, true, true);
+      int writeCount = chunkGroupWriter.write(timeColumn, valueColumns, 
startIndex, pair.right);
+      rowCount += writeCount;
+      recordCount += writeCount;
+      startIndex = pair.right;
+    }
+
+    this.checkMemorySizeAndMayFlushChunks();
+  }
+
+  /**
+   * Split TsBlock by device boundary. If generateNewTimeColumn is true, 
generate a monotonically
+   * increasing time column per device using deviceRowCountMap.
+   *
+   * @return Pair of (time column, device -> end index list)
+   */
+  private Pair<Column, List<Pair<IDeviceID, Integer>>> 
splitTsBlockByDeviceAndGetTimeColumn(
+      TsBlock tsBlock) {
+    long[] timestamps = null;
+    if (generateNewTimeColumn) {
+      timestamps = new long[tsBlock.getPositionCount()];
+    }
+    List<Pair<IDeviceID, Integer>> deviceSplitResult = new ArrayList<>();
+    IDeviceID lastDeviceID = null;
+    long lastDeviceCount = 0;
+
+    // Iterate rows and detect device boundary changes
+    for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+      IDeviceID currDeviceID = getDeviceId(tsBlock, i);
+      // Device changed, flush previous segment
+      if (!currDeviceID.equals(lastDeviceID)) {
+        if (lastDeviceID != null) {
+          deviceSplitResult.add(new Pair(lastDeviceID, i));
+          if (generateNewTimeColumn) {
+            deviceRowCountMap.put(lastDeviceID, lastDeviceCount);
+          }
+        }
+        lastDeviceID = currDeviceID;
+        if (generateNewTimeColumn) {
+          lastDeviceCount = deviceRowCountMap.getOrDefault(lastDeviceID, 0L);
+        }
+      }
+      // Generate synthetic time if required
+      if (generateNewTimeColumn) {
+        timestamps[i] = lastDeviceCount++;
+      }
+    }
+
+    deviceSplitResult.add(new Pair(lastDeviceID, tsBlock.getPositionCount()));
+    if (generateNewTimeColumn) {
+      deviceRowCountMap.put(lastDeviceID, lastDeviceCount);
+      return new Pair<>(new TimeColumn(timestamps.length, timestamps), 
deviceSplitResult);
+    } else {
+      return new Pair<>(tsBlock.getColumn(timeColumnIndexInTsBlock), 
deviceSplitResult);
+    }
+  }
+
+  private IDeviceID getDeviceId(TsBlock tsBlock, int rowIdx) {
+    String[] segments = new String[tagColumnIndexInTsBlock.length + 1];
+    segments[0] = tableName;
+    for (int i = 0; i < tagColumnIndexInTsBlock.length; i++) {
+      segments[i + 1] =
+          tsBlock
+              .getValueColumns()[tagColumnIndexInTsBlock[i]]
+              .getBinary(rowIdx)
+              .getStringValue(TSFileConfig.STRING_CHARSET);

Review Comment:
   `getDeviceId` assumes every tag column is a non-null STRING/BINARY and 
unconditionally calls `getBinary(rowIdx).getStringValue(...)`. If a tag value 
is null (or the tag column type isn’t binary-compatible), this will throw at 
runtime. Consider following the `Tablet#getDeviceID` approach: check 
`isNull(rowIdx)` and use `getObject(rowIdx)`/`toString()` (or otherwise handle 
non-string tag types) so device ID generation is robust to nulls and supported 
tag types.
   ```suggestion
         Column tagColumn = 
tsBlock.getValueColumns()[tagColumnIndexInTsBlock[i]];
         if (tagColumn.isNull(rowIdx)) {
           segments[i + 1] = null;
         } else {
           Object value = tagColumn.getObject(rowIdx);
           segments[i + 1] = value == null ? null : value.toString();
         }
   ```



##########
java/tsfile/src/test/java/org/apache/tsfile/write/writer/TableTsBlock2TsFileWriterTest.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.write.writer;
+
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.TsFileGeneratorForTest;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.v4.TableTsBlock2TsFileWriter;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class TableTsBlock2TsFileWriterTest {
+
+  private String filePath = TsFileGeneratorForTest.getTestTsFilePath("db", 0, 
0, 0);
+
+  @After
+  public void tearDown() throws Exception {
+    Files.deleteIfExists(Paths.get(filePath));
+  }
+
+  @Test
+  public void test1() throws IOException, WriteProcessException {

Review Comment:
   The test method names `test1`, `test2`, `test3` don’t communicate what 
scenarios are being validated (e.g., existing time column vs generated time 
column, with/without tag columns). Renaming them to reflect the specific 
behavior under test will make failures much easier to triage.
   ```suggestion
     public void testWriteWithExistingTimeColumnAndTagColumns() throws 
IOException, WriteProcessException {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to