No. But I do not want any 'parquet'. I need CSV.
Which code should created file?
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
//.inStreamingMode()
.inBatchMode()
.build();
.newInstance()
//.inStreamingMode()
.inBatchMode()
.build();
final TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.executeSql("CREATE TABLE some_table (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
//This is to create file?:
// 'Crete table' means 'create file'? Car is a bicycle?
tEnv.executeSql("CREATE TABLE fs_table ("
+ " column_name1 STRING, \n"
+ " column_name2 DOUBLE \n"
+ " ) WITH ( \n"
+ " 'connector'='filesystem', \n"
+ " 'path'='file:///C:/temp/test5.txt', \n"
+ " 'format'='csv' \n"
+ " )");
tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from some_table");
+ " column_name1 STRING, \n"
+ " column_name2 DOUBLE \n"
+ " ) WITH ( \n"
+ " 'connector'='filesystem', \n"
+ " 'path'='file:///C:/temp/test5.txt', \n"
+ " 'format'='csv' \n"
+ " )");
tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from some_table");
It does not work anyway
Sent: Monday, July 11, 2022 at 5:23 PM
From: "Xuyang" <xyzhong...@163.com>
To: pod...@gmx.com
Cc: user@flink.apache.org
Subject: Re:Re: Is it possible to save Table to CSV?
From: "Xuyang" <xyzhong...@163.com>
To: pod...@gmx.com
Cc: user@flink.apache.org
Subject: Re:Re: Is it possible to save Table to CSV?
Hi, did you add the dependency of parquet[1]?
[1] https://mvnrepository.com/artifact/org.apache.flink/flink-sql-parquet
[1] https://mvnrepository.com/artifact/org.apache.flink/flink-sql-parquet
在 2022-07-11 18:33:38,pod...@gmx.com 写道:
This example?:CREATE TABLE kafka_table ( user_id STRING, order_amount DOUBLE, log_ts TIMESTAMP(3), WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND ) WITH (...); CREATE TABLE fs_table ( user_id STRING, order_amount DOUBLE, dt STRING, `hour` STRING ) PARTITIONED BY (dt, `hour`) WITH ( 'connector'='filesystem', 'path'='...', 'format'='parquet', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.policy.kind'='success-file' ); -- streaming sql, insert into file system table INSERT INTO fs_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table; -- batch sql, select with partition pruning SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
CREATE TABLE kafka_table - creates table, not file
CREATE TABLE fs_table - creates table, not file
INSERT INTO fs_table - inserts into table from another table
I do not see any create file section.
Anyway:Caused by: org.apache.flink.table.api.ValidationException: Could not find any format factory for identifier 'parquet' in the classpath.Sent: Monday, July 11, 2022 at 10:31 AM
From: "Lijie Wang" <wangdachui9...@gmail.com>
To: pod...@gmx.com
Cc: user@flink.apache.org
Subject: Re: Is it possible to save Table to CSV?You can use the FileSink and set the format to csv. An example of FileSink: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-exampleBest,Lijie<pod...@gmx.com> 于2022年7月11日周一 16:16写道:If I create dynamic table with:CREATE TABLE some_table (name STRING, score INT) WITH ( 'format' = 'csv', '...' );
//do some other stuff hereThen how to save table result to CSV file?Best,Mike