No. But I do not want any 'parquet'. I need CSV.
 
Which code should created file?
 
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                //.inStreamingMode()
                .inBatchMode()
                .build();
        final TableEnvironment tEnv = TableEnvironment.create(settings);
        
        tEnv.executeSql("CREATE TABLE some_table (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
       
        //This is to create file?:
       // 'Crete table' means 'create file'? Car is a bicycle?
 
        tEnv.executeSql("CREATE TABLE fs_table ("
                + "    column_name1 STRING, \n"
                + "    column_name2 DOUBLE \n"
                + "    ) WITH ( \n"
                + "    'connector'='filesystem', \n"
                + "    'path'='file:///C:/temp/test5.txt', \n"
                + "    'format'='csv' \n"
                + "    )");
        
        tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from some_table");
 
   It does not work anyway   
 
Sent: Monday, July 11, 2022 at 5:23 PM
From: "Xuyang" <xyzhong...@163.com>
To: pod...@gmx.com
Cc: user@flink.apache.org
Subject: Re:Re: Is it possible to save Table to CSV?
Hi, did you add the dependency of parquet[1]?

[1] https://mvnrepository.com/artifact/org.apache.flink/flink-sql-parquet
 

在 2022-07-11 18:33:38,pod...@gmx.com 写道:

This example?:
 
CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  log_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (...);

CREATE TABLE fs_table (
  user_id STRING,
  order_amount DOUBLE,
  dt STRING,
  `hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
  'connector'='filesystem',
  'path'='...',
  'format'='parquet',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='success-file'
);

-- streaming sql, insert into file system table
INSERT INTO fs_table
SELECT
    user_id,
    order_amount,
    DATE_FORMAT(log_ts, 'yyyy-MM-dd'),
    DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;

-- batch sql, select with partition pruning
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';

 
CREATE TABLE kafka_table - creates table, not file
CREATE TABLE fs_table - creates table, not file
INSERT INTO fs_table - inserts into table from another table
 
I do not see any create file section.
 
Anyway:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any format factory for identifier 'parquet' in the classpath.
 
 
Sent: Monday, July 11, 2022 at 10:31 AM
From: "Lijie Wang" <wangdachui9...@gmail.com>
To: pod...@gmx.com
Cc: user@flink.apache.org
Subject: Re: Is it possible to save Table to CSV?
You can use the FileSink and set the format to csv. An example of FileSink: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example
 
Best,
Lijie
 
<pod...@gmx.com> 于2022年7月11日周一 16:16写道:
 
If I create dynamic table with:
 
CREATE TABLE some_table (name STRING, score INT)
WITH (
  'format' = 'csv',
  '...'
);
//do some other stuff here
 
Then how to save table result to CSV file?
 
Best,
 
Mike
 
 

Reply via email to