Hi,
 
you mean adding:
 
" 'csv.field-delimiter'=';', "
 
like:
 
        tEnv.executeSql("CREATE TABLE fs_table ("
                + "    column_nameA STRING, "
                + "    column_nameB DOUBLE "
                + "    ) WITH ( "
                + "    'connector'='filesystem', "
                + "    'path'='file:///C:/temp/test5.txt', "
                + "    'format'='csv', "
                + " 'csv.field-delimiter'=';', "
                + " 'sink.partition-commit.delay'='1 s', "
                + " 'sink.partition-commit.policy.kind'='success-file'"
                + "    )");
        
        tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1");
 
I did. Nothing new - still does not work.
 
 
 
Sent: Tuesday, July 26, 2022 at 4:00 PM
From: "Gil De Grove" <gil.degr...@euranova.eu>
To: "Weihua Hu" <huweihua....@gmail.com>
Cc: pod...@gmx.com, "user" <user@flink.apache.org>
Subject: Re: Why this example does not save anything to file?
Hello,
 
I may be really wrong with this, but from what I get in the source file, you are using a semi-column to separate the value.
This probably means that you should set the csv.field-delimiter to `;` to make your example work properly.
 
Have you tried with that configuration in your create table csv connector option?
 
Regards,
Gil
 
On Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua....@gmail.com> wrote:
Hi,
 
Can you see any exception logs? 
Where is this code running? is it a standalone cluster with one TaskManager?
 
 
Best,
Weihua
 
On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com> wrote:
If I get it correctly this is the way how I can save to CSV:
 
So my code is (read from file, save to file):
 
 
package flinkCSV;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class flinkCSV {
    public static void main(String[] args) throws Exception {
        
        //register and create table
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                //.inStreamingMode()
                .inBatchMode()
                .build();
        final TableEnvironment tEnv = TableEnvironment.create(settings);
        
        tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')");
        
        tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM Table1")
        .execute()
        .print();
        
        tEnv.executeSql("CREATE TABLE fs_table ("
                + "    column_nameA STRING, "
                + "    column_nameB DOUBLE "
                + "    ) WITH ( \n"
                + "    'connector'='filesystem', "
                + "    'path'='file:///C:/temp/test5.txt', "
                + "    'format'='csv', "
                + "  'sink.partition-commit.delay'='1 s', "
                + "  'sink.partition-commit.policy.kind'='success-file'"
                + "    )");
        
        tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1");
        
        tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table")
        .execute()
        .print();
        
     }
}
 
Source file (test4.txt) is:
 
aa; 23
bb; 657.9
cc; 55
 
test5.txt is not created, select from fs_table gives null
 

Reply via email to