[ https://issues.apache.org/jira/browse/FLINK-29039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Marco A. Villalobos updated FLINK-29039: ---------------------------------------- Description: RowData produced by LineBytesInputFormat is reused, but DeserializationSchemaAdapter#Reader only shallow copies produced data, thus result will always be the last row value. Given this program: {code:java} package mvillalobos.bug; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; public class IsThisABatchSQLBug { public static void main(String[] args) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" + " `file.path` STRING NOT NULL METADATA,\n" + " `file.name` STRING NOT NULL METADATA,\n" + " `file.size` BIGINT NOT NULL METADATA,\n" + " `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA,\n" + " line STRING\n" + " ) WITH (\n" + " 'connector' = 'filesystem', \n" + " 'format' = 'raw'\n" + " );"); tableEnv.executeSql("CREATE TABLE historical_raw_source\n" + " WITH (\n" + " 'path' = '/Users/minmay/dev/mvillalobos/historical/data'\n" + " ) LIKE historical_raw_source_template;"); final TableResult output = tableEnv.from("historical_raw_source").select($("line")).execute(); output.print(); } } {code} and this sample.csv file in the '/Users/minmay/dev/mvillalobos/historical/data' directory: {code:java} one two three four five six seven eight nine ten {code} {{The print results are:}} {{}} {code:java} +----+--------------------------------+ | +I | ten | | +I | ten | | +I | ten | | +I | ten | | +I | ten | | +I | ten | | +I | ten | | +I | ten | | +I | ten | | +I | ten | +----+--------------------------------+ 10 rows in set {code} was: RowData produced by LineBytesInputFormat is reused, but DeserializationSchemaAdapter#Reader only shallow copies produced data, thus result will always be the last row value. Given this program: {code:java} package mvillalobos.bug;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$;public class IsThisABatchSQLBug { public static void main(String[] args) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" + " `file.path` STRING NOT NULL METADATA,\n" + " `file.name` STRING NOT NULL METADATA,\n" + " `file.size` BIGINT NOT NULL METADATA,\n" + " `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA,\n" + " line STRING\n" + " ) WITH (\n" + " 'connector' = 'filesystem', \n" + " 'format' = 'raw'\n" + " );"); tableEnv.executeSql("CREATE TABLE historical_raw_source\n" + " WITH (\n" + " 'path' = '/Users/minmay/dev/mvillalobos/historical/data'\n" + " ) LIKE historical_raw_source_template;"); final TableResult output = tableEnv.from("historical_raw_source").select($("line")).execute(); output.print(); } } {code} and this sample.csv file in the '/Users/minmay/dev/mvillalobos/historical/data' directory: {code:java} one two three four five six seven eight nine ten {code} {{The print results are:}} {{}} {code:java} +----+--------------------------------+ | +I | ten | | +I | ten | | +I | ten | | +I | ten | | +I | ten | | +I | ten | | +I | ten | | +I | ten | | +I | ten | | +I | ten | +----+--------------------------------+ 10 rows in set {code} > RowData produced by LineBytesInputFormat is reused, but > DeserializationSchemaAdapter#Reader only shallow copies produced data, thus > result will always be the last row value > ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-29039 > URL: https://issues.apache.org/jira/browse/FLINK-29039 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem > Affects Versions: 1.15.1 > Environment: This issue was discovered on MacOS Big Sur. > Reporter: Marco A. Villalobos > Priority: Major > > RowData produced by LineBytesInputFormat is reused, but > DeserializationSchemaAdapter#Reader only shallow copies produced data, thus > result will always be the last row value. > > Given this program: > {code:java} > package mvillalobos.bug; > import org.apache.flink.api.common.RuntimeExecutionMode; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.TableResult; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import static org.apache.flink.table.api.Expressions.$; > public class IsThisABatchSQLBug { > public static void main(String[] args) { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setRuntimeMode(RuntimeExecutionMode.BATCH); > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" + > " `file.path` STRING NOT NULL METADATA,\n" + > " `file.name` STRING NOT NULL METADATA,\n" + > " `file.size` BIGINT NOT NULL METADATA,\n" + > " `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL > METADATA,\n" + > " line STRING\n" + > " ) WITH (\n" + > " 'connector' = 'filesystem', \n" + > " 'format' = 'raw'\n" + > " );"); > tableEnv.executeSql("CREATE TABLE historical_raw_source\n" + > " WITH (\n" + > " 'path' = > '/Users/minmay/dev/mvillalobos/historical/data'\n" + > " ) LIKE historical_raw_source_template;"); final > TableResult output = > tableEnv.from("historical_raw_source").select($("line")).execute(); > output.print(); > } > } {code} > and this sample.csv file in the > '/Users/minmay/dev/mvillalobos/historical/data' directory: > {code:java} > one > two > three > four > five > six > seven > eight > nine > ten {code} > {{The print results are:}} > {{}} > {code:java} > +----+--------------------------------+ > | +I | ten | > | +I | ten | > | +I | ten | > | +I | ten | > | +I | ten | > | +I | ten | > | +I | ten | > | +I | ten | > | +I | ten | > | +I | ten | > +----+--------------------------------+ > 10 rows in set {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)