DanielCarter-stack commented on PR #10383:
URL: https://github.com/apache/seatunnel/pull/10383#issuecomment-3794732428

   <!-- code-pr-reviewer -->
   <!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10383", "part": 1, 
"total": 1} -->
   ### Issue 1: BOM Processing Logic Error in Split Scenario (BLOCKER)
   
   **Location**: `CsvReadStrategy.java:102-105`
   
   **Code**:
   ```java
   try (BOMInputStream bomIn = new BOMInputStream(wrapInputStream(inputStream, 
split));
           BufferedReader reader =
                   new BufferedReader(new InputStreamReader(bomIn, 
getCharset(bomIn)));
           CSVParser csvParser = new CSVParser(reader, getCSVFormat())) {
   ```
   
   **Related Context**:
   - Caller: `AbstractReadStrategy.resolveArchiveCompressedInputStream()` 
passes InputStream starting from position `split.getStart()` in split scenario
   - `wrapInputStream()` internally calls `safeSlice(resultStream, 
split.getStart(), split.getLength())`
   - `safeSlice()` definition: `protected static InputStream 
safeSlice(InputStream in, long start, long length)` 
(AbstractReadStrategy.java:510)
   
   **Problem Description**:
   
   In split scenario (`enableSplitFile=true`), the original `inputStream` 
already starts from the middle of the file (e.g., starting from byte 1000). The 
`safeSlice()` in `wrapInputStream()` method will **skip `split.getStart()` 
bytes AGAIN** (e.g., skip 1000 bytes), causing:
   1. **Duplicate skip**: Actual data start position = `原始start + split.start`, 
skipping 2x offset
   2. **Data loss**: Data at the beginning of the file is skipped
   3. **BOM detection failure**: BOM is at the beginning of the file (bytes 
0-2), but split starts from the middle, BOM can never be detected
   
   **Example Scenario**:
   ```
   文件: [BOM 3字节][Header 20字节][Data1 50字节][Data2 50字节]...
          0-2        3-22          23-72        73-122
   
   Split1: start=0, length=75 → 读取 [BOM+Header+Data1]
   Split2: start=75, length=50 → 读取 [Data2]
   
   Split2处理流程:
   1. HDFS返回从byte 75开始的InputStream(已跳过前75字节)
   2. wrapInputStream()调用safeSlice(stream, 75, 50)
   3. safeSlice再次跳过75字节 → 实际从byte 150开始
   4. 结果:Data2完全丢失,读取了错误位置的数据
   ```
   
   **Root Cause**:
   Confusion between two split modes:
   1. **HDFS/FileSystem native split**: HDFS API can `open(path, start)`, 
return stream starting from start position (already skipped first start bytes)
   2. **Application layer split**: Use `safeSlice()` to trim complete stream
   
   Need to determine whether `inputStream` is already a split stream.
   
   **Potential Risks**:
   - Risk 1: Data error (skipping expected data)
   - Risk 2: Array out of bounds (skip exceeds stream length)
   - Risk 3: Silent error (data partially missing but no exception)
   
   **Impact Scope**:
   - Direct impact: `CsvReadStrategy.readProcess()` in `enableSplitFile=true` 
scenario
   - Indirect impact: All CSV files using split read
   - Impact surface: CSV Connector (common production scenario)
   
   **Severity**: **BLOCKER** - Breaks core functionality
   
   **Improvement Suggestions**:
   
   ```java
   // Solution 1: Pass whether InputStream is chunked via parameter 
(recommended)
   private InputStream wrapInputStream(InputStream inputStream, FileSourceSplit 
split, 
                                       boolean isAlreadySliced) throws 
IOException {
       InputStream resultStream;
       switch (compressFormat) {
           case LZO:
               LzopCodec lzo = new LzopCodec();
               resultStream = lzo.createInputStream(inputStream);
               break;
           case NONE:
               resultStream = inputStream;
               break;
           default:
               log.warn("Csv file does not support this compress type: {}",
                       compressFormat.getCompressCodec());
               resultStream = inputStream;
               break;
       }
       // Only perform slice when InputStream is not chunked
       if (enableSplitFile && split.getLength() > -1 && !isAlreadySliced) {
           resultStream = safeSlice(resultStream, split.getStart(), 
split.getLength());
       }
       return resultStream;
   }
   
   // Caller needs to pass the flag
   // Flag source: Check if inputStream is BoundedInputStream or other chunked 
stream type
   ```
   
   or
   
   ```java
   // Solution 2: Determine by checking InputStream type (more complex 
implementation)
   private InputStream wrapInputStream(InputStream inputStream, FileSourceSplit 
split) 
                                       throws IOException {
       InputStream resultStream;
       switch (compressFormat) {
           case LZO:
               LzopCodec lzo = new LzopCodec();
               resultStream = lzo.createInputStream(inputStream);
               break;
           case NONE:
               resultStream = inputStream;
               break;
           default:
               log.warn("Csv file does not support this compress type: {}",
                       compressFormat.getCompressCodec());
               resultStream = inputStream;
               break;
       }
       // Check if it's already BoundedInputStream (return type of safeSlice)
       boolean isAlreadySliced = (resultStream instanceof BoundedInputStream);
       if (enableSplitFile && split.getLength() > -1 && !isAlreadySliced) {
           resultStream = safeSlice(resultStream, split.getStart(), 
split.getLength());
       }
       return resultStream;
   }
   ```
   
   **Rationale**: 
   - Need to distinguish between "split stream" and "complete stream" to avoid 
duplicate skip
   - Solution 1 is clearer but requires interface modification or using 
ThreadLocal to pass state
   - Solution 2 is simple to implement but relies on specific class type 
(fragile)
   - Need to synchronously modify the same logic in `TextReadStrategy` and 
`JsonReadStrategy`
   
   ---
   
   ### Issue 2: Bug in Old LZO Compression + Split Scenario Not Explained in PR 
(CRITICAL)
   
   **Location**: `CsvReadStrategy.java:194` (new code) vs original code line 118
   
   **Old Code**:
   ```java
   switch (compressFormat) {
       case LZO:
           LzopCodec lzo = new LzopCodec();
           actualInputStream = lzo.createInputStream(inputStream);  // Create 
decompression stream
           break;
       // ...
   }
   if (enableSplitFile && split.getLength() > -1) {
       actualInputStream = safeSlice(inputStream, split.getStart(), 
split.getLength());  // ❌ Use original inputStream instead of actualInputStream
   }
   ```
   
   **New Code**:
   ```java
   switch (compressFormat) {
       case LZO:
           LzopCodec lzo = new LzopCodec();
           resultStream = lzo.createInputStream(inputStream);
           break;
       // ...
   }
   if (enableSplitFile && split.getLength() > -1) {
       resultStream = safeSlice(resultStream, split.getStart(), 
split.getLength());  // ✓ Use resultStream
   }
   ```
   
   **Problem Description**:
   
   Old code has serious bug in LZO compression + split scenario:
   1. First create decompression stream `lzo.createInputStream(inputStream)` → 
`actualInputStream`
   2. Then use **original compressed stream** `inputStream` for `safeSlice()` → 
overwrites `actualInputStream`
   3. Result: Actually reading **compressed stream** middle part, not 
decompressed stream
   
   **Example Impact**:
   ```
   场景:LZO压缩CSV文件,分片读取
   
   旧版行为:
   1. actualInputStream = 解压流(解压后100MB)
   2. actualInputStream = safeSlice(压缩流, start=1GB, length=10MB)
      → 对10MB压缩数据进行slice,然后作为CSV解析
      → 错误:解析了压缩二进制数据而非CSV文本
   
   预期行为:
   1. resultStream = 解压流
   2. resultStream = safeSlice(解压流, start=500MB, length=10MB)
      → 对解压后的数据进行slice
   ```
   
   **Potential Risks**:
   - Risk 1: Data parsing failure (treating compressed data as CSV)
   - Risk 2: Data corruption
   - Risk 3: Production task silent failure
   
   **Impact Scope**:
   - Direct impact: Split read of LZO compressed CSV files
   - Indirect impact: All CSV data sources using LZO compression
   - Impact surface: CSV Connector + LZO compression scenario
   
   **Severity**: **CRITICAL** - Fixed existing bug but not explained
   
   **Improvement Suggestions**:
   
   1. **Clearly explain in PR description**:
      ```
      ### Fixed Bugs
      - Fixed a bug where LZO compressed CSV files with split enabled would 
parse 
        compressed binary data instead of decompressed CSV text. The old code 
        incorrectly sliced the original compressed stream instead of the 
decompressed 
        stream.
      ```
   
   2. **添加测试用例**:
   ```java
   @Test
   public void testLzoCompressedCsvWithSplit() throws Exception {
       // Need to prepare LZO compressed CSV test file
       // Verify correctness of split read
   }
   ```
   
   3. **同步检查其他Reader**:
      - `TextReadStrategy.java:209` - 相同的bug需要修复
      - `JsonReadStrategy.java:119` - 相同的bug需要修复
   
   ** Reason**:
   - 这是对现有bug的修复,而非仅BOM处理
   - 应该在PR中明确说明,便于Code Review和回归测试
   - 其他Reader有相同bug,应该一起修复或创建跟踪Issue
   
   ---
   
   # ## Issue 3: BOM handling not tested when firstLineAsHeader=false (MAJOR)
   
   ** Location**: `CsvReadStrategyTest.java`
   
   ** Problem Description**:
   
   当前测试仅覆盖 `firstLineAsHeader=true` 场景:
   ```java
   map.put(FileBaseSourceOptions.CSV_USE_HEADER_LINE.key(), "true");
   ```
   
   但 `firstLineAsHeader=false` 时(用户显式提供schema),行为不同:
   - Header从 `inputCatalogTable` 获取,不从文件读取
   - BOMInputStream仍会跳过BOM
   - 但第一行数据可能以BOM开头(如果首列被当作数据)
   
   ** Potential Risk**:
   - 风险1:`firstLineAsHeader=false` 时,第一行数据首列可能包含BOM字符
   - 风险2:用户schema无BOM字符,数据不匹配
   
   ** Example Scenario**:
   ```
   File: 9821,hawk,37,M
   User schema: id(INT), name(STRING), age(INT), gender(STRING)
   
   Current behavior:
   1. BOMInputStream skips BOM
   2. First line read as: 9821,hawk,37,M ✓ Correct
   
   But if BOM handling fails:
   1. First line read as: \uFEFF9821,hawk,37,M
   2. Parse 9821 as INT → NumberFormatException or id=\uFEFF9821
   ```
   
   ** Scope of Impact**:
   - 直接影响:使用 `firstLineAsHeader=false` 且文件有BOM的场景
   - 间接影响:无
   - 影响面:CSV Connector特定配置
   
   ** Severity**: **MAJOR** - Edge case not covered
   
   ** Improvement Suggestion**:
   
   添加测试用例:
   ```java
   @Test
   public void testUtf8BomCsvWithoutHeaderRead() throws Exception {
       URL resource = 
CsvReadStrategyTest.class.getResource("/csv/utf8_bom.csv");
       String path = Paths.get(resource.toURI()).toString();
       CsvReadStrategy csvReadStrategy = new CsvReadStrategy();
       LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
       csvReadStrategy.init(localConf);
       csvReadStrategy.getFileNamesByPath(path);
       // Do not set CSV_USE_HEADER_LINE, defaults to false
       csvReadStrategy.setPluginConfig(ConfigFactory.empty());
       csvReadStrategy.setCatalogTable(
               CatalogTableUtil.getCatalogTable(
                       "test",
                       new SeaTunnelRowType(
                               new String[] {"id", "name", "age", "gender"},
                               new SeaTunnelDataType[] {
                                   BasicType.INT_TYPE,
                                   BasicType.STRING_TYPE,
                                   BasicType.INT_TYPE,
                                   BasicType.STRING_TYPE
                               })));
       TestCollector testCollector = new TestCollector();
       csvReadStrategy.read(path, "", testCollector);
       final List<SeaTunnelRow> rows = testCollector.getRows();
       // Should read 3 lines (including header line as data)
       Assertions.assertEquals(3, rows.size());
       // First line (original header) should parse correctly, first column 
should not have BOM
       Assertions.assertEquals(9821, rows.get(0).getField(0));
       Assertions.assertEquals("hawk", rows.get(0).getField(1));
       // ...other assertions
   }
   ```
   
   ** Reason**:
   - 测试覆盖不足,边界场景未验证
   - BOMInputStream在 `firstLineAsHeader=false` 时的行为需要确认
   - 增加测试可防止未来回归
   
   ---
   
   # ## Issue 4: Insufficient explanation for cleanedHeaders necessity (MINOR)
   
   ** Location**: `CsvReadStrategy.java:122-125`
   
   ** Code**:
   ```java
   // Clean up BOM characters (\ uFEFF) in the header to solve occasional BOM 
residue
   // issues
   List<String> cleanedHeaders =
           headers.stream()
                   .map(header -> header.replace("\uFEFF", ""))
                   .collect(Collectors.toList());
   ```
   
   ** Problem Description**:
   
   注释提到"occasional BOM residue issues"(偶然的BOM残留问题),但未说明:
   1. 什么情况下BOMInputStream会失败?
   2. 如果BOMInputStream工作正常,为什么还需要 `replace("\uFEFF", "")`?
   3. 这是防御性编程还是已知边缘情况?
   
   ** Potential Risk**:
   - 风险1:代码意图不清晰
   - 风险2:可能掩盖真实的BOM处理bug
   
   ** Scope of Impact**:
   - 直接影响:代码可维护性
   - 间接影响:无
   - 影响面:CsvReadStrategy单一方法
   
   ** Severity**: **MINOR** - Documentation/comment issue
   
   ** Improvement Suggestion**:
   ```java
   // Although BOMInputStream should handle BOM correctly at the byte stream 
level,
   // we add this defensive cleaning to handle edge cases where:
   // 1. BOMInputStream fails to detect certain BOM variants
   // 2. BOM characters are embedded in the file content (not at the start)
   // 3. Encoding conversion issues (e.g., when BOM is present but encoding 
mismatch)
   List<String> cleanedHeaders =
           headers.stream()
                   .map(header -> header.replace("\uFEFF", ""))
                   .collect(Collectors.toList());
   ```
   
   或如果确实不需要,考虑移除并添加断言:
   ```java
   List<String> headers = getHeaders(csvParser);
   // Assert: BOMInputStream should have removed BOM, so headers shouldn't 
contain \uFEFF
   if (headers.stream().anyMatch(h -> h.contains("\uFEFF"))) {
       log.warn("BOM character found in CSV headers despite BOMInputStream 
processing. " +
                "Headers: {}", headers);
   }
   ```
   
   ** Reason**:
   - 代码可读性和可维护性
   - 防御性编程需要明确说明原因
   - 如果BOMInputStream工作正常,这个清理是多余的
   
   ---
   
   # ## Issue 5: Lack of support documentation for other BOM types (MINOR)
   
   ** Location**: `CsvReadStrategy.java:199-202`
   
   ** Code**:
   ```java
   private Charset getCharset(BOMInputStream bomIn) throws IOException {
       return bomIn.getBOM() == null
               ? Charset.forName(encoding)
               : Charset.forName(bomIn.getBOM().getCharsetName());
   }
   ```
   
   ** Problem Description**:
   
   当前代码支持自动检测多种BOM类型(UTF-8, UTF-16 LE, UTF-16 BE, UTF-32等),但:
   1. 测试仅覆盖UTF-8 BOM
   2. 文档未说明支持其他BOM类型
   3. 未说明当用户配置encoding与BOM声明的encoding不一致时的行为
   
   ** Example Scenario**:
   ```
   File: UTF-16 LE BOM file
   User config: encoding = "UTF-8"
   
   Current behavior:
   1. BOMInputStream detects UTF-16 LE BOM
   2. getCharset() returns "UTF-16LE"
   3. But user expects UTF-8 → may cause data error
   ```
   
   ** Potential Risk**:
   - 风险1:用户配置被忽略
   - 风险2:不同BOM类型的行为未测试
   
   ** Scope of Impact**:
   - 直接影响:非UTF-8 BOM文件的处理
   - 间接影响:无
   - 影响面:CSV Connector
   
   ** Severity**: **MINOR** - Feature enhancement/documentation issue
   
   ** Improvement Suggestion**:
   
   1. **添加测试**(可选,如果支持其他BOM):
   ```java
   @Test
   public void testUtf16BomCsv() throws Exception {
       // Prepare CSV file with UTF-16 BE/LE BOM
       // Verify automatic detection and correct parsing
   }
   ```
   
   2. **文档说明**:
   ```java
   /**
    * Detects the character set from BOM if present.
    * 
    * Supported BOM types:
    * - UTF-8 (EF BB BF)
    * - UTF-16 BE (FE FF)
    * - UTF-16 LE (FF FE)
    * - UTF-32 BE (00 00 FE FF)
    * - UTF-32 LE (FF FE 00 00)
    * 
    * If BOM is detected, the charset declared by the BOM takes precedence
    * over the user-configured encoding. This follows the standard behavior
    * of BOMInputStream.
    * 
    * @param bomIn the BOMInputStream to check
    * @return the detected or configured charset
    */
   private Charset getCharset(BOMInputStream bomIn) throws IOException {
       return bomIn.getBOM() == null
               ? Charset.forName(encoding)
               : Charset.forName(bomIn.getBOM().getCharsetName());
   }
   ```
   
   3. **或考虑添加配置选项**:
   ```java
   // If true, respect BOM charset; if false, always use user-configured 
encoding
   private boolean respectBomEncoding = true;
   ```
   
   ** Reason**:
   - 功能完整性
   - 用户明确知道行为
   - 如果不支持其他BOM,应该明确说明
   
   ---
   
   # ## Issue 6: Formatting error in comments (MINOR)
   
   ** Location**: `CsvReadStrategy.java:120`
   
   ** Code**:
   ```java
   // Clean up BOM characters (\ uFEFF) in the header to solve occasional BOM 
residue
   // issues
   ```
   
   ** Problem Description**: There's a space in the comment `(\ uFEFF)` which 
should be `(\uFEFF)`
   
   ** Scope of Impact**: Comment format
   
   ** Severity**: **MINOR** - Typo
   
   ** Improvement Suggestion**:
   ```java
   // Clean up BOM characters (\uFEFF) in the header to solve occasional BOM 
residue issues
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to