JackieTien97 commented on code in PR #793:
URL: https://github.com/apache/tsfile/pull/793#discussion_r3159636826


##########
java/tools/src/main/java/org/apache/tsfile/tools/ImportSchemaParser.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tsfile.tools;
+
+import org.apache.tsfile.enums.TSDataType;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class ImportSchemaParser {
+
+  private enum Section {
+    NONE,
+    TAG_COLUMNS,
+    SOURCE_COLUMNS
+  }
+
+  public static ImportSchema parse(String filePath) throws IOException {
+    ImportSchema schema = new ImportSchema();
+    List<ImportSchema.TagColumn> tagColumns = new ArrayList<>();
+    List<ImportSchema.SourceColumn> sourceColumns = new ArrayList<>();
+
+    try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) 
{
+      String line;
+      Section section = Section.NONE;
+
+      while ((line = reader.readLine()) != null) {
+        line = line.trim();
+        if (line.isEmpty() || line.startsWith("//")) {
+          continue;
+        }
+
+        if (line.startsWith("table_name=")) {
+          schema.setTableName(extractValue(line));
+          section = Section.NONE;
+        } else if (line.startsWith("time_precision=")) {
+          schema.setTimePrecision(extractValue(line));
+          section = Section.NONE;
+        } else if (line.startsWith("has_header=")) {
+          String val = extractValue(line);
+          if (!"true".equals(val) && !"false".equals(val)) {
+            throw new IllegalArgumentException("has_header must be true or 
false");
+          }
+          schema.setHasHeader(Boolean.parseBoolean(val));
+          section = Section.NONE;
+        } else if (line.startsWith("separator=")) {
+          schema.setSeparator(extractValue(line));
+          section = Section.NONE;
+        } else if (line.startsWith("null_format=")) {
+          schema.setNullFormat(extractValue(line));
+          section = Section.NONE;
+        } else if (line.startsWith("time_column=")) {
+          schema.setTimeColumnName(extractValue(line));
+          section = Section.NONE;
+        } else if (line.equals("tag_columns") || line.equals("id_columns")) {
+          section = Section.TAG_COLUMNS;
+        } else if (line.equals("source_columns") || 
line.equals("csv_columns")) {
+          section = Section.SOURCE_COLUMNS;
+        } else if (section == Section.TAG_COLUMNS) {
+          tagColumns.add(parseTagColumn(line));
+        } else if (section == Section.SOURCE_COLUMNS) {
+          sourceColumns.add(parseSourceColumn(line));
+        }
+      }
+    }
+
+    schema.setTagColumns(tagColumns);
+    schema.setSourceColumns(sourceColumns);
+
+    if ("tab".equals(schema.getSeparator())) {
+      schema.setSeparator("\t");
+    }
+
+    validate(schema);
+    return schema;
+  }
+
+  private static String extractValue(String line) {
+    int index = line.indexOf('=');
+    return line.substring(index + 1);
+  }
+
+  private static ImportSchema.TagColumn parseTagColumn(String line) {
+    String[] parts = line.split(" ");
+    if (parts.length == 3 && parts[1].trim().equalsIgnoreCase("DEFAULT")) {
+      return new ImportSchema.TagColumn(parts[0].trim(), parts[2].trim());
+    } else if (parts.length == 1) {
+      return new ImportSchema.TagColumn(parts[0].trim());
+    }
+    throw new IllegalArgumentException("Invalid tag_columns format: " + line);
+  }
+
+  private static ImportSchema.SourceColumn parseSourceColumn(String line) {
+    String[] parts = line.split(" ");
+    String name = parts[0].trim();
+    if (name.endsWith(",") || name.endsWith(";")) {
+      name = name.substring(0, name.length() - 1);
+    }
+
+    if (parts.length == 2) {
+      String dataType = parts[1].trim();
+      if (dataType.endsWith(",") || dataType.endsWith(";")) {
+        dataType = dataType.substring(0, dataType.length() - 1);
+      }
+      if (dataType.equalsIgnoreCase("SKIP")) {
+        return ImportSchema.SourceColumn.skip(name);
+      }
+      return new ImportSchema.SourceColumn(name, resolveDataType(dataType));
+    } else if (parts.length == 1) {
+      if (name.equalsIgnoreCase("SKIP")) {
+        return ImportSchema.SourceColumn.skip();
+      }
+      return new ImportSchema.SourceColumn(name, TSDataType.STRING);
+    }
+    throw new IllegalArgumentException("Invalid source_columns format: " + 
line);
+  }
+
+  private static TSDataType resolveDataType(String typeStr) {
+    switch (typeStr.toUpperCase()) {
+      case "TEXT":
+        return TSDataType.TEXT;
+      case "STRING":
+        return TSDataType.STRING;
+      case "INT32":
+        return TSDataType.INT32;
+      case "INT64":
+        return TSDataType.INT64;
+      case "FLOAT":
+        return TSDataType.FLOAT;
+      case "DOUBLE":
+        return TSDataType.DOUBLE;
+      case "BOOLEAN":
+        return TSDataType.BOOLEAN;
+      case "BLOB":
+        return TSDataType.BLOB;
+      case "DATE":
+        return TSDataType.DATE;
+      case "TIMESTAMP":
+        return TSDataType.TIMESTAMP;
+      default:
+        throw new IllegalArgumentException("Unknown data type: " + typeStr);
+    }
+  }
+
+  private static void validate(ImportSchema schema) {
+    String tp = schema.getTimePrecision();
+    if (!"ms".equals(tp) && !"us".equals(tp) && !"ns".equals(tp)) {

Review Comment:
   **Bug:** This validation rejects `"s"` as a valid `time_precision`, but 
`TimeConverter`, the CLI `--time_precision` option, the README (which lists `ms 
/ us / ns / s`), and `ArrowSourceReader.detectTimestampPrecision()` (returns 
`"s"` for `SECOND`) all support it.
   
   A schema file with `time_precision=s` will fail here.
   
   **Suggested fix:**
   ```java
   if (!"ms".equals(tp) && !"us".equals(tp) && !"ns".equals(tp) && 
!"s".equals(tp)) {
       throw new IllegalArgumentException("time_precision must be ms, us, ns, 
or s");
   }
   ```
   Or extract a `VALID_PRECISIONS` set for single-source-of-truth.



##########
java/tools/src/main/java/org/apache/tsfile/tools/ArrowSourceReader.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tsfile.tools;
+
+import org.apache.tsfile.enums.TSDataType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ArrowSourceReader implements SourceReader {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ArrowSourceReader.class);
+
+  private final File sourceFile;
+  private ImportSchema schema;
+  private BufferAllocator allocator;
+  private ArrowFileReader arrowReader;
+  private Schema arrowSchema;
+  private List<ArrowBlock> recordBatches;
+  private int currentBatchIndex;
+  private boolean exhausted;
+
+  private String overrideTableName;
+  private String overrideTimePrecision;
+
+  public ArrowSourceReader(File sourceFile, ImportSchema schema) {
+    this.sourceFile = sourceFile;
+    this.schema = schema;
+    this.exhausted = false;
+    this.currentBatchIndex = 0;
+  }
+
+  public ArrowSourceReader(File sourceFile) {
+    this.sourceFile = sourceFile;
+    this.schema = null;
+    this.exhausted = false;
+    this.currentBatchIndex = 0;
+  }
+
+  public void setOverrideTableName(String tableName) {
+    this.overrideTableName = tableName;
+  }
+
+  public void setOverrideTimePrecision(String timePrecision) {
+    this.overrideTimePrecision = timePrecision;
+  }
+
+  @Override
+  public ImportSchema inferSchema() {
+    if (schema != null) {
+      throw new UnsupportedOperationException("inferSchema() is only available 
in auto mode");
+    }
+
+    try {
+      ensureReaderOpen();
+
+      List<String> columnNames = new ArrayList<>();
+      List<TSDataType> columnTypes = new ArrayList<>();
+      String detectedTimePrecision = null;
+
+      for (Field field : arrowSchema.getFields()) {
+        String name = field.getName();
+        columnNames.add(name);
+        TSDataType tsType = mapArrowType(field.getType());
+        columnTypes.add(tsType);
+
+        if (("time".equals(name) || "TIME".equals(name)) && 
detectedTimePrecision == null) {
+          detectedTimePrecision = detectTimestampPrecision(field.getType());
+        }
+      }
+
+      String timeColumn = AutoSchemaInferer.detectTimeColumn(columnNames);
+      TSDataType[] types = columnTypes.toArray(new TSDataType[0]);
+
+      String tableName =
+          overrideTableName != null
+              ? overrideTableName
+              : AutoSchemaInferer.deriveTableName(sourceFile.getName(), 
"arrow_data");
+
+      String timePrecision;
+      if (overrideTimePrecision != null) {
+        timePrecision = overrideTimePrecision;
+      } else if (detectedTimePrecision != null) {
+        timePrecision = detectedTimePrecision;
+      } else {
+        timePrecision = "ms";
+      }
+
+      schema =
+          AutoSchemaInferer.buildAutoSchema(
+              tableName, timeColumn, columnNames, types, timePrecision);
+      return schema;
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to infer schema from: " + 
sourceFile.getAbsolutePath(), e);
+    }
+  }
+
+  @Override
+  public SourceBatch readBatch() {
+    if (exhausted) {
+      return null;
+    }
+
+    try {
+      ensureReaderOpen();
+
+      if (currentBatchIndex >= recordBatches.size()) {
+        exhausted = true;
+        return null;
+      }
+
+      arrowReader.loadRecordBatch(recordBatches.get(currentBatchIndex));
+      currentBatchIndex++;
+
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      int rowCount = root.getRowCount();
+      if (rowCount == 0) {
+        if (currentBatchIndex >= recordBatches.size()) {
+          exhausted = true;
+          return null;
+        }
+        return readBatch();
+      }
+
+      List<String> schemaColumnNames = getSchemaColumnNames();
+      Map<String, FieldVector> vectorMap = new HashMap<>();
+      for (FieldVector vec : root.getFieldVectors()) {
+        vectorMap.put(vec.getName(), vec);
+      }
+
+      int numCols = schemaColumnNames.size();
+      List<Object[]> rows = new ArrayList<>(rowCount);
+
+      for (int r = 0; r < rowCount; r++) {
+        Object[] row = new Object[numCols];
+        for (int c = 0; c < numCols; c++) {
+          String colName = schemaColumnNames.get(c);
+          FieldVector vec = vectorMap.get(colName);
+          if (vec == null || vec.isNull(r)) {
+            row[c] = null;
+          } else {
+            row[c] = extractValue(vec, r);
+          }
+        }
+        rows.add(row);
+      }
+
+      return SourceBatch.fromRows(schemaColumnNames, rows);
+    } catch (IOException e) {
+      LOGGER.error("Error reading Arrow file: " + 
sourceFile.getAbsolutePath(), e);
+      exhausted = true;
+      return null;
+    }
+  }
+
+  @Override
+  public void close() {
+    if (arrowReader != null) {
+      try {
+        arrowReader.close();
+      } catch (IOException e) {
+        LOGGER.error("Error closing Arrow reader", e);
+      }
+      arrowReader = null;
+    }
+    if (allocator != null) {
+      allocator.close();
+      allocator = null;
+    }
+  }
+
+  private void ensureReaderOpen() throws IOException {
+    if (arrowReader == null) {
+      allocator = new RootAllocator();
+      arrowReader = new ArrowFileReader(new 
FileInputStream(sourceFile).getChannel(), allocator);

Review Comment:
   **Resource leak:** The `FileInputStream` is created as an anonymous 
temporary — only its channel is captured. The stream itself is never closed. 
Even if `ArrowFileReader.close()` closes the channel, the `FileInputStream` 
object may hold OS-level resources.
   
   **Suggested fix:** store it as a field and close it in `close()`:
   ```java
   private FileInputStream fileInputStream;
   
   private void ensureReaderOpen() throws IOException {
       if (arrowReader == null) {
           allocator = new RootAllocator();
           fileInputStream = new FileInputStream(sourceFile);
           arrowReader = new ArrowFileReader(fileInputStream.getChannel(), 
allocator);
           // ...
       }
   }
   
   public void close() {
       // ... close arrowReader first ...
       if (fileInputStream != null) {
           try { fileInputStream.close(); } catch (IOException e) { ... }
       }
       // ... close allocator ...
   }
   ```



##########
java/tools/pom.xml:
##########
@@ -52,6 +55,87 @@
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-hadoop</artifactId>
+            <version>1.14.4</version>

Review Comment:
   Hardcoded dependency versions (`parquet-hadoop 1.14.4`, `hadoop-common 
3.3.6`, `arrow-vector 15.0.2`) should be managed via `<properties>` — ideally 
in the parent POM — for consistent version management across modules. This also 
makes upgrades more visible in diffs.



##########
java/tools/pom.xml:
##########
@@ -28,6 +28,9 @@
     </parent>
     <artifactId>tools</artifactId>
     <name>TsFile: Java: Tools</name>
+    <properties>
+        <enforcer.skip>true</enforcer.skip>

Review Comment:
   Silently skipping the Maven enforcer hides dependency convergence issues. 
The heavy Hadoop/Arrow dependency tree likely triggers this — please either:
   1. Resolve the convergence conflicts explicitly (preferred for Apache 
projects), or
   2. Add a comment explaining why it's safe to skip and what specific conflict 
it works around.



##########
java/tools/README.md:
##########
@@ -119,9 +104,108 @@ Time INT64,
 Temperature FLOAT,
 Emission DOUBLE,
 ```
-## Commands
 
+In this example:
+- `Group` is a virtual tag column (not in CSV) with default value `Datang`
+- `Region`, `FactoryNumber`, `DeviceNumber` are tag columns read from CSV
+- `Model` and `MaintenanceCycle` are skipped via `SKIP`
+- `Temperature` and `Emission` are automatically derived as FIELD columns
+
+For Parquet / Arrow in schema mode, `source_columns` matches by column 
**name** instead of position. Named SKIP is also supported:
+```
+source_columns
+Time INT64,
+unused_col SKIP,
+Temperature FLOAT,
+Emission DOUBLE,
+```
+
+## CLI Parameters
+
+| Parameter | Description | Required | Default |
+|-----------|------------|----------|---------|
+| -s, --source | Input file or directory | Yes | |
+| -t, --target | Output directory | Yes | |
+| --schema | Schema file path. Omit for auto mode. | No | |
+| --fail_dir | Directory for failed source files | No | failed |
+| --format | Source format: csv / parquet / arrow. Auto-detected by file 
extension if omitted. | No | auto-detect |
+| --table_name | Table name override (auto mode) | No | derived from filename |
+| --time_precision | Time precision override (auto mode): ms / us / ns / s | 
No | ms |
+| --separator | CSV delimiter (auto mode): , / tab / ; | No | , |
+| -b, --block_size | CSV chunk size (e.g. 256M, 1G) | No | 256M |
+| -tn, --thread_num | Thread count for parallel processing | No | 8 |
+
+## Modes
+
+### Schema Mode
+
+Provide a `--schema` file to explicitly define column mapping, types, tags, 
and time column.
+
+```sh
+# CSV
+csv2tsfile.sh --source ./data/csv --target ./output --fail_dir ./failed 
--schema ./schema/import.schema
+csv2tsfile.bat --source .\data\csv --target .\output --fail_dir .\failed 
--schema .\schema\import.schema
+
+# Parquet
+parquet2tsfile.sh --source ./data/parquet --target ./output --fail_dir 
./failed --schema ./schema/import.schema
+parquet2tsfile.bat --source .\data\parquet --target .\output --fail_dir 
.\failed --schema .\schema\import.schema
+
+# Arrow
+arrow2tsfile.sh --source ./data/arrow --target ./output --fail_dir ./failed 
--schema ./schema/import.schema
+arrow2tsfile.bat --source .\data\arrow --target .\output --fail_dir .\failed 
--schema .\schema\import.schema
+```
+
+### Auto Mode
+
+Omit `--schema` to automatically infer column types and detect the time column.
+
+**Auto mode rules:**
+- Time column: must be named exactly `time` or `TIME` (case-sensitive, strict 
match)
+- All other columns become FIELD (no tag inference)
+- CSV type inference uses a 100-row sampling window with promotion chain: 
`BOOLEAN → INT64 → DOUBLE → STRING`

Review Comment:
   **Doc mismatch with code:** This implies BOOLEAN promotes to INT64, but 
`AutoSchemaInferer.promote()` sends BOOLEAN mixed with **any** other type 
directly to STRING:
   ```java
   if (current == InferredType.BOOLEAN || incoming == InferredType.BOOLEAN) {
       return InferredType.STRING;
   }
   ```
   The actual behavior: INT64 ↔ DOUBLE promotes to DOUBLE; any other mixed pair 
(including BOOLEAN + numeric) promotes to STRING. The promotion chain 
description should reflect this.



##########
java/tools/src/main/java/org/apache/tsfile/tools/CsvSourceReader.java:
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tsfile.tools;
+
+import org.apache.tsfile.enums.TSDataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CsvSourceReader implements SourceReader {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CsvSourceReader.class);
+  private static final long DEFAULT_CHUNK_SIZE = 256L * 1024 * 1024;
+
+  private final File sourceFile;
+  private ImportSchema schema;
+  private final long chunkSizeBytes;
+  private final String separator;
+
+  private BufferedReader reader;
+  private String[] columnNames;
+  private boolean headerConsumed;
+  private boolean exhausted;
+
+  private List<Object[]> bufferedSampleRows;
+  private String overrideTableName;
+  private String overrideTimePrecision;
+
+  public CsvSourceReader(File sourceFile, ImportSchema schema) {
+    this(sourceFile, schema, DEFAULT_CHUNK_SIZE);
+  }
+
+  public CsvSourceReader(File sourceFile, ImportSchema schema, long 
chunkSizeBytes) {
+    this.sourceFile = sourceFile;
+    this.schema = schema;
+    this.chunkSizeBytes = chunkSizeBytes;
+    this.separator = schema.getSeparator();
+    this.headerConsumed = false;
+    this.exhausted = false;
+  }
+
+  public CsvSourceReader(File sourceFile, String separator) {
+    this(sourceFile, separator, DEFAULT_CHUNK_SIZE);
+  }
+
+  public CsvSourceReader(File sourceFile, String separator, long 
chunkSizeBytes) {
+    this.sourceFile = sourceFile;
+    this.schema = null;
+    this.chunkSizeBytes = chunkSizeBytes;
+    this.separator = separator != null ? separator : ",";
+    this.headerConsumed = false;
+    this.exhausted = false;
+  }
+
+  public void setOverrideTableName(String tableName) {
+    this.overrideTableName = tableName;
+  }
+
+  public void setOverrideTimePrecision(String timePrecision) {
+    this.overrideTimePrecision = timePrecision;
+  }
+
+  @Override
+  public ImportSchema inferSchema() {
+    if (schema != null) {
+      throw new UnsupportedOperationException(
+          "inferSchema() is only available in auto mode (no schema provided)");
+    }
+
+    try {
+      ensureReaderOpen();
+
+      String headerLine = reader.readLine();
+      if (headerLine == null) {
+        throw new IllegalArgumentException("CSV file is empty: " + 
sourceFile.getAbsolutePath());
+      }
+      columnNames = splitLine(headerLine);
+      headerConsumed = true;
+
+      List<String> colNameList = new ArrayList<>(columnNames.length);
+      for (String name : columnNames) {
+        colNameList.add(name);
+      }
+
+      bufferedSampleRows = new ArrayList<>();
+      for (int i = 0; i < AutoSchemaInferer.DEFAULT_SAMPLE_SIZE; i++) {
+        String line = reader.readLine();
+        if (line == null) {
+          exhausted = true;
+          break;
+        }
+        bufferedSampleRows.add(parseLineAutoMode(line));
+      }
+
+      String timeColumn = AutoSchemaInferer.detectTimeColumn(colNameList);
+      TSDataType[] types =
+          AutoSchemaInferer.inferColumnTypes(
+              colNameList,
+              bufferedSampleRows,
+              timeColumn,
+              AutoSchemaInferer.DEFAULT_CSV_NULL_TOKENS);
+
+      String tableName =
+          overrideTableName != null
+              ? overrideTableName
+              : AutoSchemaInferer.deriveTableName(sourceFile.getName(), 
"csv_data");
+      String timePrecision = overrideTimePrecision != null ? 
overrideTimePrecision : "ms";
+
+      schema =
+          AutoSchemaInferer.buildAutoSchema(
+              tableName, timeColumn, colNameList, types, timePrecision);
+      schema.setNullFormat("\\N");
+
+      return schema;
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to infer schema from: " + 
sourceFile.getAbsolutePath(), e);
+    }
+  }
+
+  @Override
+  public SourceBatch readBatch() {
+    boolean hasBuffered = bufferedSampleRows != null && 
!bufferedSampleRows.isEmpty();
+    if (exhausted && !hasBuffered) {
+      return null;
+    }
+
+    try {
+      ensureReaderOpen();
+
+      if (schema.isHasHeader() && !headerConsumed) {
+        String headerLine = reader.readLine();
+        if (headerLine == null) {
+          exhausted = true;
+          return null;
+        }
+        columnNames = splitLine(headerLine);
+        validateColumnCount();
+        headerConsumed = true;
+      } else if (!headerConsumed) {
+        columnNames = buildColumnNamesFromSchema();
+        headerConsumed = true;
+      }
+
+      List<Object[]> rows = new ArrayList<>();
+      long currentSize = 0;
+
+      if (hasBuffered) {
+        rows.addAll(bufferedSampleRows);
+        bufferedSampleRows = null;
+      }
+
+      if (!exhausted) {
+        String line;
+        while ((line = reader.readLine()) != null) {
+          byte[] lineBytes = line.getBytes(StandardCharsets.UTF_8);
+          long lineSize = lineBytes.length;
+
+          if (currentSize > 0 && currentSize + lineSize > chunkSizeBytes) {

Review Comment:
   Minor: when the chunk boundary is hit, the boundary line is added to `rows` 
(line 184) and triggers a return — but `currentSize` is never updated to 
include this final line. This means chunks can be slightly larger than 
`chunkSizeBytes`. Not a functional issue (just an off-by-one on the 
accounting), but worth noting.



##########
java/tools/src/main/java/org/apache/tsfile/tools/ImportSchema.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tsfile.tools;
+
+import org.apache.tsfile.enums.TSDataType;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class ImportSchema {
+
+  private String tableName = "";
+  private String timePrecision = "ms";
+  private boolean hasHeader = true;
+  private String separator = ",";
+  private String nullFormat;
+  private String timeColumnName = "";
+  private List<TagColumn> tagColumns = new ArrayList<>();
+  private List<SourceColumn> sourceColumns = new ArrayList<>();
+
+  public static class TagColumn {
+    private final String name;
+    private final boolean hasDefault;
+    private final String defaultValue;
+
+    public TagColumn(String name) {
+      this.name = name;
+      this.hasDefault = false;
+      this.defaultValue = null;
+    }
+
+    public TagColumn(String name, String defaultValue) {
+      this.name = name;
+      this.hasDefault = true;
+      this.defaultValue = defaultValue;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public boolean hasDefault() {
+      return hasDefault;
+    }
+
+    public String getDefaultValue() {
+      return defaultValue;
+    }
+
+    public boolean existsInSource() {

Review Comment:
   Nit: the name `existsInSource()` is a bit misleading — it actually returns 
`!hasDefault`, meaning "is not a virtual column." Consider renaming to 
`isVirtual()` (with inverted return) or at minimum adding a one-liner 
clarifying the semantics.



##########
java/tools/src/main/java/org/apache/tsfile/tools/ImportExecutor.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tsfile.tools;
+
+import org.apache.tsfile.write.TsFileWriter;
+import org.apache.tsfile.write.record.Tablet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+public class ImportExecutor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ImportExecutor.class);
+
+  private final ImportSchema importSchema;
+  private final TimeConverter timeConverter;
+  private final TabletBuilder tabletBuilder;
+
+  public ImportExecutor(ImportSchema importSchema) {
+    this.importSchema = importSchema;
+    this.timeConverter = new TimeConverter(importSchema.getTimePrecision());
+    this.tabletBuilder = new TabletBuilder(importSchema, timeConverter);
+  }
+
+  public boolean execute(SourceReader reader, String outputDir, String 
sourceBaseName) {
+    return execute(reader, outputDir, sourceBaseName, null);
+  }
+
+  public boolean execute(
+      SourceReader reader, String outputDir, String sourceBaseName, String 
failDir) {

Review Comment:
   `failDir` parameter is accepted but never used in the method body. This is 
misleading to callers who pass a fail directory expecting it to be wired up.
   
   Either use it (e.g., move/copy failed source files there on error) or remove 
the parameter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to