That's Flink fault-tolerance mechanism, see https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/
Op ma 1 aug. 2022 om 16:37 schreef <pod...@gmx.com>: > What's that? > > > *Sent:* Monday, August 01, 2022 at 2:49 PM > *From:* "Martijn Visser" <martijnvis...@apache.org> > *To:* pod...@gmx.com > *Cc:* user@flink.apache.org > *Subject:* Re: Why this example does not save anything to file? > Do you have checkpointing enabled? > > Op za 30 jul. 2022 om 17:31 schreef <pod...@gmx.com>: > >> Thanks David but there's no problem with that (probably ";" is default >> separator). >> I can read the file and insert into "Table1" (I said that in my mail). >> Problem is to save to CSV. >> >> >> *Sent:* Saturday, July 30, 2022 at 3:33 PM >> *From:* "David Anderson" <dander...@apache.org> >> *To:* pod...@gmx.com >> *Cc:* "user" <user@flink.apache.org> >> *Subject:* Re: Why this example does not save anything to file? >> You need to add >> >> 'csv.field-delimiter'=';' >> >> to the definition of Table1 so that the input from test4.txt can be >> correctly parsed: >> >> tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, >> column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', >> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv', >> 'csv.field-delimiter'=';')"); >> >> Cheers, >> David >> >> On Fri, Jul 29, 2022 at 4:15 PM <pod...@gmx.com> wrote: >> >>> Hi, >>> >>> you mean adding: >>> >>> " 'csv.field-delimiter'=';', " >>> >>> like: >>> >>> tEnv.executeSql("CREATE TABLE fs_table (" >>> + " column_nameA STRING, " >>> + " column_nameB DOUBLE " >>> + " ) WITH ( " >>> + " 'connector'='filesystem', " >>> + " 'path'='file:///C:/temp/test5.txt', " >>> + " 'format'='csv', " >>> + " 'csv.field-delimiter'=';', " >>> + " 'sink.partition-commit.delay'='1 s', " >>> + " 'sink.partition-commit.policy.kind'='success-file'" >>> + " )"); >>> >>> tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, >>> column_name2 from Table1"); >>> >>> I did. Nothing new - still does not work. >>> >>> >>> >>> *Sent:* Tuesday, July 26, 2022 at 4:00 PM >>> *From:* "Gil De Grove" <gil.degr...@euranova.eu> >>> *To:* "Weihua Hu" <huweihua....@gmail.com> >>> *Cc:* pod...@gmx.com, "user" <user@flink.apache.org> >>> *Subject:* Re: Why this example does not save anything to file? >>> Hello, >>> >>> I may be really wrong with this, but from what I get in the source file, >>> you are using a semi-column to separate the value. >>> This probably means that you should set the csv.field-delimiter to `;` >>> to make your example work properly. >>> >>> Have you tried with that configuration in your create table csv >>> connector option? >>> >>> Regards, >>> Gil >>> >>> On Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua....@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> Can you see any exception logs? >>>> Where is this code running? is it a standalone cluster with one >>>> TaskManager? >>>> >>>> >>>> Best, >>>> Weihua >>>> >>>> On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com> wrote: >>>> >>>>> If I get it correctly this is the way how I can save to CSV: >>>>> >>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example >>>>> >>>>> So my code is (read from file, save to file): >>>>> >>>>> >>>>> *package flinkCSV;* >>>>> >>>>> *import org.apache.flink.table.api.EnvironmentSettings; import >>>>> org.apache.flink.table.api.TableEnvironment;* >>>>> *public class flinkCSV {* >>>>> * public static void main(String[] args) throws Exception {* >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> * //register and create table >>>>> EnvironmentSettings settings = EnvironmentSettings >>>>> .newInstance() //.inStreamingMode() >>>>> .inBatchMode() .build();* >>>>> * final TableEnvironment tEnv = >>>>> TableEnvironment.create(settings);* >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> * tEnv.executeSql("CREATE TABLE Table1 (column_name1 >>>>> STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', >>>>> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')"); >>>>> tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM >>>>> Table1") .execute() .print(); * >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> * tEnv.executeSql("CREATE TABLE fs_table (" + >>>>> " column_nameA STRING, " + " column_nameB DOUBLE " >>>>> + " ) WITH ( \n" + " >>>>> 'connector'='filesystem', " + " >>>>> 'path'='file:///C:/temp/test5.txt', " + " >>>>> 'format'='csv', " + " 'sink.partition-commit.delay'='1 >>>>> s', " + " >>>>> 'sink.partition-commit.policy.kind'='success-file'" + " >>>>> )"); tEnv.executeSql("INSERT INTO fs_table SELECT >>>>> column_name1, column_name2 from Table1"); >>>>> tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table") >>>>> .execute() .print(); } }* >>>>> >>>>> Source file (test4.txt) is: >>>>> >>>>> aa; 23 >>>>> bb; 657.9 >>>>> cc; 55 >>>>> >>>>> test5.txt is not created, select from fs_table gives null >>>>> >>>>> >>>>