1.
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(bsEnv, bsSettings);
String sourceTableDDL = "CREATE TABLE fs_table (" +
" user_id
STRING," +
" order_amount
DOUBLE," +
" dt
TIMESTAMP(3)," +
" pt AS
PROCTIME() " +
" ) WITH (" +
"
'connector'='filesystem'," +
"
'path'='D:\\Program
Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv',"
+
" 'format'='csv'"
+
" )";
bsTableEnv.executeSql(sourceTableDDL);
bsTableEnv.executeSql("select * from
fs_table").print();
2. csv
order.csv
zhangsan,12.34,2020-08-03 12:23:50
lisi,234.67,2020-08-03 12:25:50
wangwu,57.6,2020-08-03 12:25:50
zhaoliu,345,2020-08-03 12:28:50
3.
- Source: FileSystemTableSource(user_id, order_amount, dt, pt) -
Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS pt]) -
SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) switched from
RUNNING to FAILED.
java.io.IOException: Failed to deserialize CSV row.
at
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299)
at
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields expected
but was 3.
at
org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441)
at
org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244)
at
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293)
... 5 more