?????? flink 1.11 ddl sql ????PROCTIME()????????csv????

2020-07-23 文章 Asahi Lee
filesystemcsv??
filesystem




--  --
??: 
   "user-zh"

https://issues.apache.org/jira/browse/FLINK-18665 
;

> ?? 2020??7??2300:07??Asahi Lee <978466...@qq.com> ??
> 
> 1. 
> StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings bsSettings 
= EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>         StreamTableEnvironment 
bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
> 
> 
>         String sourceTableDDL = 
"CREATE TABLE fs_table (" +
>             
    "  user_id STRING," +
>             
    "  order_amount DOUBLE," +
>             
    "  dt TIMESTAMP(3)," +
>             
    "  pt AS PROCTIME() " +
>             
    " ) WITH (" +
>             
    "  'connector'='filesystem'," +
>             
    "  'path'='D:\\Program 
Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv',"
 +
>             
    "  'format'='csv'" +
>             
    " )";
> 
> 
>         
bsTableEnv.executeSql(sourceTableDDL);
>         bsTableEnv.executeSql("select 
* from fs_table").print();
> 2. csv
> order.csv
> zhangsan,12.34,2020-08-03 12:23:50
> lisi,234.67,2020-08-03 12:25:50
> wangwu,57.6,2020-08-03 12:25:50
> zhaoliu,345,2020-08-03 12:28:50
> 
> 
> 
> 3. 
>  - Source: FileSystemTableSource(user_id, order_amount, dt, pt) 
-> Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS 
pt]) -> SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) 
switched from RUNNING to FAILED.
> java.io.IOException: Failed to deserialize CSV row.
>at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299)
>at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210)
>at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
>at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields 
expected but was 3.
>at 
org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441)
>at 
org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244)
>at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293)
>... 5 more

flink 1.11 ddl sql ????PROCTIME()????????csv????

2020-07-22 文章 Asahi Lee
1. 
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment bsTableEnv = 
StreamTableEnvironment.create(bsEnv, bsSettings);


        String sourceTableDDL = "CREATE TABLE fs_table (" +
                "  user_id 
STRING," +
                "  order_amount 
DOUBLE," +
                "  dt 
TIMESTAMP(3)," +
                "  pt AS 
PROCTIME() " +
                " ) WITH (" +
                "  
'connector'='filesystem'," +
                "  
'path'='D:\\Program 
Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv',"
 +
                "  'format'='csv'" 
+
                " )";


        bsTableEnv.executeSql(sourceTableDDL);
        bsTableEnv.executeSql("select * from 
fs_table").print();
2. csv
order.csv
zhangsan,12.34,2020-08-03 12:23:50
lisi,234.67,2020-08-03 12:25:50
wangwu,57.6,2020-08-03 12:25:50
zhaoliu,345,2020-08-03 12:28:50



3. 
 - Source: FileSystemTableSource(user_id, order_amount, dt, pt) -> 
Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS pt]) -> 
SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) switched from 
RUNNING to FAILED.
java.io.IOException: Failed to deserialize CSV row.
at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299)
at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210)
at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields expected 
but was 3.
at 
org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441)
at 
org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244)
at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293)
... 5 more