Hello,

I may be really wrong with this, but from what I get in the source file,
you are using a semi-column to separate the value.
This probably means that you should set the csv.field-delimiter to `;` to
make your example work properly.

Have you tried with that configuration in your create table csv connector
option?

Regards,
Gil

On Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua....@gmail.com> wrote:

> Hi,
>
> Can you see any exception logs?
> Where is this code running? is it a standalone cluster with one
> TaskManager?
>
>
> Best,
> Weihua
>
>
> On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com> wrote:
>
>> If I get it correctly this is the way how I can save to CSV:
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example
>>
>> So my code is (read from file, save to file):
>>
>>
>> *package flinkCSV;*
>>
>> *import org.apache.flink.table.api.EnvironmentSettings; import
>> org.apache.flink.table.api.TableEnvironment;*
>> *public class flinkCSV {*
>> *    public static void main(String[] args) throws Exception {*
>>
>>
>>
>>
>>
>>
>> *                 //register and create table         EnvironmentSettings
>> settings = EnvironmentSettings                 .newInstance()
>>      //.inStreamingMode()                 .inBatchMode()
>> .build();*
>> *        final TableEnvironment tEnv = TableEnvironment.create(settings);*
>>
>>
>>
>>
>>
>>
>> *                 tEnv.executeSql("CREATE TABLE Table1 (column_name1
>> STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
>> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')");
>>                  tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM
>> Table1")         .execute()         .print();         *
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *        tEnv.executeSql("CREATE TABLE fs_table ("                 + "
>>  column_nameA STRING, "                 + "    column_nameB DOUBLE "
>>          + "    ) WITH ( \n"                 + "
>>  'connector'='filesystem', "                 + "
>>  'path'='file:///C:/temp/test5.txt', "                 + "
>>  'format'='csv', "                 + "  'sink.partition-commit.delay'='1
>> s', "                 + "
>> 'sink.partition-commit.policy.kind'='success-file'"                 + "
>>  )");                  tEnv.executeSql("INSERT INTO fs_table SELECT
>> column_name1, column_name2 from Table1");
>> tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table")
>> .execute()         .print();               } }*
>>
>> Source file (test4.txt) is:
>>
>> aa; 23
>> bb; 657.9
>> cc; 55
>>
>> test5.txt is not created, select from fs_table gives null
>>
>>
>

Reply via email to