Thanks for raising it. Yes, you're right. It's indeed a bug.
The problem is the RowData produced by LineBytesInputFormat is reused, but 
DeserializationSchemaAdapter#Reader only do shallow copy of the produced data, 
so that the finnal result will always be the last row value.

Could you please help create a jira to track it?

Best regards,
Yuxia

----- 原始邮件 -----
发件人: "Marco Villalobos" <mvillalo...@kineteque.com>
收件人: "User" <user@flink.apache.org>
发送时间: 星期四, 2022年 8 月 18日 上午 6:08:33
主题: Is this a Batch SQL Bug?

Given this program:

```java
package mvillalobos.bug;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class IsThisABatchSQLBug {

   public static void main(String[] args) {
      final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
      env.setRuntimeMode(RuntimeExecutionMode.BATCH);
      final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
      tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" +
            "        `file.path`              STRING NOT NULL METADATA,\n" +
            "        `file.name`              STRING NOT NULL METADATA,\n" +
            "        `file.size`              BIGINT NOT NULL METADATA,\n" +
            "        `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL 
METADATA,\n" +
            "        line                    STRING\n" +
            "      ) WITH (\n" +
            "        'connector' = 'filesystem', \n" +
            "        'format' = 'raw'\n" +
            "      );");
      tableEnv.executeSql("CREATE TABLE historical_raw_source\n" +
            "      WITH (\n" +
            "        'path' = 
'/Users/minmay/dev/mvillalobos/historical/data'\n" +
            "      ) LIKE historical_raw_source_template;");

      final TableResult output = 
tableEnv.from("historical_raw_source").select($("line")).execute();
      output.print();
   }
}
```

and this sample.csv file in the '/Users/minmay/dev/mvillalobos/historical/data' 
directory:

```text
one
two
three
four
five
six
seven
eight
nine
ten
```

The print results are:
```text
+----+--------------------------------+
| +I |                            ten |
| +I |                            ten |
| +I |                            ten |
| +I |                            ten |
| +I |                            ten |
| +I |                            ten |
| +I |                            ten |
| +I |                            ten |
| +I |                            ten |
| +I |                            ten |
+----+--------------------------------+
10 rows in set
```

I was expecting all rows to print. If this is not a bug, then what am I 
misunderstanding?

I do noticre that the transient field:

private transient RecordCollector collector;

in 
org.apache.flink.connector.file.table.DeserializationSchemaAdapter.LineBytesInputFormat

becomes empty on each iteration, as though it failed to serialize correctly.

Regardless, I don't know what's wrong.  Any advice would deeply help.

Marco A. Villalobos

Reply via email to