Hi, Filesystem connector 支持streaming 写入,streaming 读取 还未支持,所以读取完了就停止。支持streaming 写入从文档上看[1]应该是有计划的
[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/filesystem.html <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/filesystem.html> > 在 2020年7月23日,22:05,Asahi Lee <978466...@qq.com> 写道: > > 使用filesystem读取csv作为源,使用流环境,为什么我的程序一执行就停止,而不是等待文件的追加写入,继续计算呢? > 还是filesystem只能用于批操作? > > > > > ------------------ 原始邮件 ------------------ > 发件人: > "user-zh" > > <xbjt...@gmail.com <mailto:xbjt...@gmail.com>>; > 发送时间: 2020年7月23日(星期四) 上午9:55 > 收件人: "user-zh"<user-zh@flink.apache.org > <mailto:user-zh@flink.apache.org>>; > > 主题: Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误 > > > > Hi, Asahi > > 这是一个已知bug[1],filesystem connector上处理计算列有点问题,已经有PR了,会在1.11.2和1.12版本上修复 > > > Best > Leonard Xu > [1] https://issues.apache.org/jira/browse/FLINK-18665 > <https://issues.apache.org/jira/browse/FLINK-18665> > <https://issues.apache.org/jira/browse/FLINK-18665> > <https://issues.apache.org/jira/browse/FLINK-18665>>; > > > 在 2020年7月23日,00:07,Asahi Lee <978466...@qq.com > <mailto:978466...@qq.com>> 写道: > > > > 1. 程序 > > StreamExecutionEnvironment bsEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > > &nbsp; &nbsp; &nbsp; &nbsp; EnvironmentSettings > bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > > &nbsp; &nbsp; &nbsp; &nbsp; StreamTableEnvironment > bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); > > > > > > &nbsp; &nbsp; &nbsp; &nbsp; String sourceTableDDL = > "CREATE TABLE fs_table (" + > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; "&nbsp; user_id STRING," + > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; "&nbsp; order_amount DOUBLE," + > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; "&nbsp; dt TIMESTAMP(3)," + > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; "&nbsp; pt AS PROCTIME() " + > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; " ) WITH (" + > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; "&nbsp; 'connector'='filesystem'," + > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; "&nbsp; 'path'='D:\\Program > Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv'," > + > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; "&nbsp; 'format'='csv'" + > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; " )"; > > > > > > &nbsp; &nbsp; &nbsp; &nbsp; > bsTableEnv.executeSql(sourceTableDDL); > > &nbsp; &nbsp; &nbsp; &nbsp; > bsTableEnv.executeSql("select * from fs_table").print(); > > 2. csv文件 > > order.csv > > zhangsan,12.34,2020-08-03 12:23:50 > > lisi,234.67,2020-08-03 12:25:50 > > wangwu,57.6,2020-08-03 12:25:50 > > zhaoliu,345,2020-08-03 12:28:50 > > > > > > > > 3. 错误 > > &nbsp;- Source: FileSystemTableSource(user_id, order_amount, dt, pt) > -&gt; Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS > pt]) -&gt; SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) > switched from RUNNING to FAILED. > > java.io.IOException: Failed to deserialize CSV row. > > at > org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299) > > at > org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210) > > at > org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91) > > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > > Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields > expected but was 3. > > at > org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441) > > at > org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244) > > at > org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293) > > ... 5 more