Copilot commented on code in PR #753:
URL: https://github.com/apache/tsfile/pull/753#discussion_r2998487478
##########
java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TableChunkGroupWriterImpl.java:
##########
@@ -33,4 +39,65 @@ public TableChunkGroupWriterImpl(IDeviceID deviceId,
EncryptParameter encryptPar
super(deviceId, encryptParam);
setConvertColumnNameToLowerCase(true);
}
+
+ public int write(
+ Column timeColumn,
+ Column[] valueColumns,
+ List<IMeasurementSchema> measurementSchemas,
+ int startRowIndex,
+ int endRowIndex)
+ throws IOException {
+ int pointCount = 0;
+ ValueChunkWriter[] valueChunkWriters = new
ValueChunkWriter[valueColumns.length];
+ for (int i = 0; i < measurementSchemas.size(); i++) {
+ valueChunkWriters[i] =
tryToAddSeriesWriterInternal(measurementSchemas.get(i));
+ }
+ for (int rowIndex = startRowIndex; rowIndex < endRowIndex; rowIndex++) {
+ long time = timeColumn.getLong(rowIndex);
+ for (int valueColumnIndex = 0; valueColumnIndex < valueColumns.length;
valueColumnIndex++) {
+ Column valueColumn = valueColumns[valueColumnIndex];
+ IMeasurementSchema measurementSchema =
measurementSchemas.get(valueColumnIndex);
+ ValueChunkWriter valueChunkWriter = valueChunkWriters[rowIndex];
+ boolean isNull = valueColumn.isNull(rowIndex);
Review Comment:
In `write(...)`, the `ValueChunkWriter` is indexed by `rowIndex`
(`valueChunkWriters[rowIndex]`) instead of by `valueColumnIndex`. This will
throw `ArrayIndexOutOfBoundsException` as soon as `endRowIndex` exceeds the
number of columns, and it also writes data into the wrong series. Use the
value-column index when selecting the writer, and consider validating
`measurementSchemas.size()` matches `valueColumns.length` before
initializing/iterating to avoid AIOOB or null writers. Also, this method
currently skips the out-of-order check (`checkIsHistoryData(time)`) that other
chunk writers perform, so out-of-order writes may silently corrupt state; it
should apply the same check (and adjust the throws signature accordingly).
##########
java/tsfile/src/main/java/org/apache/tsfile/write/v4/TableTsBlock2TsFileWriter.java:
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.write.v4;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.column.TimeColumn;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl;
+import org.apache.tsfile.write.chunk.IChunkGroupWriter;
+import org.apache.tsfile.write.chunk.TableChunkGroupWriterImpl;
+import org.apache.tsfile.write.chunk.ValueChunkWriter;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Convert TsBlock (table model) into TsFile format. Core responsibilities: 1.
Split TsBlock by
+ * device (based on tag columns) 2. Optionally generate a new time column per
device 3. Dispatch
+ * rows to corresponding ChunkGroupWriter
+ */
+public class TableTsBlock2TsFileWriter extends DeviceTableModelWriter {
+
+ private final boolean generateNewTimeColumn;
+ private final int timeColumnIndexInTsBlock;
+ private final int[] tagColumnIndexInTsBlock;
+ private final int[] fieldColumnIndexInTsBlock;
+ private final IMeasurementSchema[] fieldColumnSchemas;
+
+ private final String tableName;
+ private final Map<IDeviceID, Long> deviceRowCountMap;
+
+ private int rowCount = 0;
+
+ public TableTsBlock2TsFileWriter(
+ File file,
+ TableSchema tableSchema,
+ long memoryThreshold,
+ boolean generateNewTimeColumn,
+ int timeColumnIndexInTsBlock,
+ int[] tagColumnIndexesInTsBlock,
+ int[] fieldColumnIndexesInTsBlock,
+ IMeasurementSchema[] fieldColumnSchemas)
+ throws IOException {
+ super(file, tableSchema, memoryThreshold);
+ this.tableName = tableSchema.getTableName();
+ this.generateNewTimeColumn = generateNewTimeColumn;
+ this.timeColumnIndexInTsBlock = timeColumnIndexInTsBlock;
+ this.tagColumnIndexInTsBlock = tagColumnIndexesInTsBlock;
+ this.fieldColumnIndexInTsBlock = fieldColumnIndexesInTsBlock;
+ this.deviceRowCountMap = generateNewTimeColumn ? new HashMap<>() : null;
+ this.fieldColumnSchemas = fieldColumnSchemas;
+ }
+
+ public void write(TsBlock tsBlock) throws IOException, WriteProcessException
{
+ if (tsBlock == null || tsBlock.isEmpty()) {
+ return;
+ }
+ // Split TsBlock into device partitions and prepare time column
+ Pair<Column, List<Pair<IDeviceID, Integer>>>
timeColumnAndDeviceIdEndIndexPairs =
+ splitTsBlockByDeviceAndGetTimeColumn(tsBlock);
+ Column timeColumn = timeColumnAndDeviceIdEndIndexPairs.left;
+ // Extract value columns according to schema mapping
+ Column[] valueColumns = new Column[fieldColumnIndexInTsBlock.length];
+ for (int i = 0; i < valueColumns.length; i++) {
+ valueColumns[i] = tsBlock.getColumn(fieldColumnIndexInTsBlock[i]);
+ }
+ List<Pair<IDeviceID, Integer>> deviceIdEndIndexPairs =
timeColumnAndDeviceIdEndIndexPairs.right;
+ int startIndex = 0;
+
+ // Iterate each device segment and write data into its ChunkGroup
+ for (Pair<IDeviceID, Integer> pair : deviceIdEndIndexPairs) {
+ TableTsBlockChunkGroupWriterImpl chunkGroupWriter =
+ (TableTsBlockChunkGroupWriterImpl)
tryToInitialGroupWriter(pair.left, true, true);
+ int writeCount = chunkGroupWriter.write(timeColumn, valueColumns,
startIndex, pair.right);
+ rowCount += writeCount;
+ recordCount += writeCount;
+ startIndex = pair.right;
+ }
+
+ this.checkMemorySizeAndMayFlushChunks();
+ }
+
+ /**
+ * Split TsBlock by device boundary. If generateNewTimeColumn is true,
generate a monotonically
+ * increasing time column per device using deviceRowCountMap.
+ *
+ * @return Pair of (time column, device -> end index list)
+ */
+ private Pair<Column, List<Pair<IDeviceID, Integer>>>
splitTsBlockByDeviceAndGetTimeColumn(
+ TsBlock tsBlock) {
+ long[] timestamps = null;
+ if (generateNewTimeColumn) {
+ timestamps = new long[tsBlock.getPositionCount()];
+ }
+ List<Pair<IDeviceID, Integer>> deviceSplitResult = new ArrayList<>();
+ IDeviceID lastDeviceID = null;
+ long lastDeviceCount = 0;
+
+ // Iterate rows and detect device boundary changes
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ IDeviceID currDeviceID = getDeviceId(tsBlock, i);
+ // Device changed, flush previous segment
+ if (!currDeviceID.equals(lastDeviceID)) {
+ if (lastDeviceID != null) {
+ deviceSplitResult.add(new Pair(lastDeviceID, i));
+ if (generateNewTimeColumn) {
+ deviceRowCountMap.put(lastDeviceID, lastDeviceCount);
+ }
+ }
+ lastDeviceID = currDeviceID;
+ if (generateNewTimeColumn) {
+ lastDeviceCount = deviceRowCountMap.getOrDefault(lastDeviceID, 0L);
+ }
+ }
+ // Generate synthetic time if required
+ if (generateNewTimeColumn) {
+ timestamps[i] = lastDeviceCount++;
+ }
+ }
+
+ deviceSplitResult.add(new Pair(lastDeviceID, tsBlock.getPositionCount()));
+ if (generateNewTimeColumn) {
+ deviceRowCountMap.put(lastDeviceID, lastDeviceCount);
+ return new Pair<>(new TimeColumn(timestamps.length, timestamps),
deviceSplitResult);
+ } else {
+ return new Pair<>(tsBlock.getColumn(timeColumnIndexInTsBlock),
deviceSplitResult);
+ }
+ }
+
+ private IDeviceID getDeviceId(TsBlock tsBlock, int rowIdx) {
+ String[] segments = new String[tagColumnIndexInTsBlock.length + 1];
+ segments[0] = tableName;
+ for (int i = 0; i < tagColumnIndexInTsBlock.length; i++) {
+ segments[i + 1] =
+ tsBlock
+ .getValueColumns()[tagColumnIndexInTsBlock[i]]
+ .getBinary(rowIdx)
+ .getStringValue(TSFileConfig.STRING_CHARSET);
Review Comment:
`getDeviceId` assumes every tag column is a non-null STRING/BINARY and
unconditionally calls `getBinary(rowIdx).getStringValue(...)`. If a tag value
is null (or the tag column type isn’t binary-compatible), this will throw at
runtime. Consider following the `Tablet#getDeviceID` approach: check
`isNull(rowIdx)` and use `getObject(rowIdx)`/`toString()` (or otherwise handle
non-string tag types) so device ID generation is robust to nulls and supported
tag types.
```suggestion
Column tagColumn =
tsBlock.getValueColumns()[tagColumnIndexInTsBlock[i]];
if (tagColumn.isNull(rowIdx)) {
segments[i + 1] = null;
} else {
Object value = tagColumn.getObject(rowIdx);
segments[i + 1] = value == null ? null : value.toString();
}
```
##########
java/tsfile/src/test/java/org/apache/tsfile/write/writer/TableTsBlock2TsFileWriterTest.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.write.writer;
+
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.TsFileGeneratorForTest;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.v4.TableTsBlock2TsFileWriter;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class TableTsBlock2TsFileWriterTest {
+
+ private String filePath = TsFileGeneratorForTest.getTestTsFilePath("db", 0,
0, 0);
+
+ @After
+ public void tearDown() throws Exception {
+ Files.deleteIfExists(Paths.get(filePath));
+ }
+
+ @Test
+ public void test1() throws IOException, WriteProcessException {
Review Comment:
The test method names `test1`, `test2`, `test3` don’t communicate what
scenarios are being validated (e.g., existing time column vs generated time
column, with/without tag columns). Renaming them to reflect the specific
behavior under test will make failures much easier to triage.
```suggestion
public void testWriteWithExistingTimeColumnAndTagColumns() throws
IOException, WriteProcessException {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]