Re: Why this example does not save anything to file?
Thank you. So I understand 'this is normal Flink property - live with it' :-) M. Sent: Wednesday, August 31, 2022 at 9:37 PM From: "David Anderson" To: pod...@gmx.com Cc: "user" Subject: Re: Why this example does not save anything to file? With the parallelism set to 2, you will get 2 files. More than 2, actually -- e.g., with hourly buckets, you'll get 2 files per hour. If the i/o bandwidth of a single instance is sufficient to handle your expected throughput, then you can set the parallelism of the sink (or the entire pipeline) to one. If it's not, then you'll be glad to have multiple instances each handling a slice of the job and writing to independent filesystems. David On Wed, Aug 31, 2022 at 8:44 AM mailto:pod...@gmx.com]> wrote: Doesn't it depends on 'sink.parallelism'? If I set 'sink.parallelism' = '2' I get two files, 'sink.parallelism' = '1' just one file... But I think doing like that I reduce the number of tasks so it will have negative impact on performance :-( Sent: Tuesday, August 30, 2022 at 3:22 PM From: "Martijn Visser" mailto:mart...@immerok.com]> To: pod...@gmx.com[mailto:pod...@gmx.com], user@flink.apache.org[mailto:user@flink.apache.org] Subject: Re: Why this example does not save anything to file? Hi Mike, I think that's caused by you not having enabled checkpointing. If you enable that, it should be resolved I think. Best regards, Martijn On Wed, Aug 3, 2022 at 9:01 PM mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]> wrote: Thank you very much Martijn you dedicated your productive time to help me! I'm new noob in this subject - I took that example somewhere from Internet. I see problem for guys like me is that Flink syntax changes from version to version quite significantly. So here not 'connector.type' but 'connector' etc. Additional problem was that there was no error that something is wrong and in addition 'select from' in next lines display result from table... Anyway, I was expecting single file 'test5.txt' as a result but got file for each row. part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-0-file-0 part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-1-file-0 ... Can it be just one file? Best, Mike Sent: Wednesday, August 03, 2022 at 4:03 PM From: "Martijn Visser" mailto:martijnvis...@apache.org][mailto:martijnvis...@apache.org[mailto:martijnvis...@apache.org]]> To: pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]] Cc: user@flink.apache.org[mailto:user@flink.apache.org][mailto:user@flink.apache.org[mailto:user@flink.apache.org]] Subject: Re: Why this example does not save anything to file? I've verified your code locally and it doesn't work indeed, at least not with the latest Flink version (I've tested it with Flink 1.15). There are a couple of reasons for that: 1. You've mentioned in this thread that there's no problem with the 'csv.field-delimiter'. There is actually, because the default is a , and not a ; as documented at https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options][https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/%23format-options]] 2. When adding this option, Flink wouldn't compile because the SQL statement uses options that are different then documented at https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/][https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5D][https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5D%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5D%5D]. You have connector.type, connector.path and format.type listed. It should be connector, path and format. In the end, I used the following code and the expect result was properly written: tEnv.executeSql( "CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector' = 'filesystem', 'path' = 'file:///C:/temp/test4.txt', 'format' = 'csv', 'csv.field-delimi
Re: Why this example does not save anything to file?
With the parallelism set to 2, you will get 2 files. More than 2, actually -- e.g., with hourly buckets, you'll get 2 files per hour. If the i/o bandwidth of a single instance is sufficient to handle your expected throughput, then you can set the parallelism of the sink (or the entire pipeline) to one. If it's not, then you'll be glad to have multiple instances each handling a slice of the job and writing to independent filesystems. David On Wed, Aug 31, 2022 at 8:44 AM wrote: > > Doesn't it depends on 'sink.parallelism'? > If I set 'sink.parallelism' = '2' I get two files, 'sink.parallelism' = > '1' just one file... > > But I think doing like that I reduce the number of tasks so it will have > negative impact on performance :-( > > > > Sent: Tuesday, August 30, 2022 at 3:22 PM > From: "Martijn Visser" > To: pod...@gmx.com, user@flink.apache.org > Subject: Re: Why this example does not save anything to file? > > Hi Mike, > > I think that's caused by you not having enabled checkpointing. If you > enable that, it should be resolved I think. > > Best regards, > > Martijn > > On Wed, Aug 3, 2022 at 9:01 PM mailto:pod...@gmx.com]> > wrote: > Thank you very much Martijn you dedicated your productive time to help me! > I'm new noob in this subject - I took that example somewhere from > Internet. I see problem for guys like me is that Flink syntax changes from > version to version quite significantly. So here not 'connector.type' but > 'connector' etc. > > Additional problem was that there was no error that something is wrong and > in addition 'select from' in next lines display result from table... > > Anyway, I was expecting single file 'test5.txt' as a result but got file > for each row. > > part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-0-file-0 > part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-1-file-0 > ... > > > Can it be just one file? > Best, > > Mike > > > Sent: Wednesday, August 03, 2022 at 4:03 PM > From: "Martijn Visser" martijnvis...@apache.org]> > To: pod...@gmx.com[mailto:pod...@gmx.com] > Cc: user@flink.apache.org[mailto:user@flink.apache.org] > Subject: Re: Why this example does not save anything to file? > > I've verified your code locally and it doesn't work indeed, at least not > with the latest Flink version (I've tested it with Flink 1.15). There are a > couple of reasons for that: > > 1. You've mentioned in this thread that there's no problem with the > 'csv.field-delimiter'. There is actually, because the default is a , and > not a ; as documented at > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options] > 2. When adding this option, Flink wouldn't compile because the SQL > statement uses options that are different then documented at > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/][https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5D] > <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5D%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5D%5D>. > You have connector.type, connector.path and format.type listed. It should > be connector, path and format. > > In the end, I used the following code and the expect result was properly > written: > > tEnv.executeSql( > "CREATE TABLE Table1 (column_name1 STRING, column_name2 > DOUBLE) WITH ('connector' = 'filesystem', 'path' = > 'file:///C:/temp/test4.txt', 'format' = 'csv', 'csv.field-delimiter' = > ';')"); > > Best regards, > > Martijn > > Op di 2 aug. 2022 om 00:42 schreef mailto:pod...@gmx.com > ][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>: > > No, I do not have it > > > > Sent: Monday, August 01, 2022 at 4:43 PM > From: "Martijn Visser" martijnvis...@apache.org][mailto:martijnvis...@apache.org[mailto: > martijnvis...@apache.org]]> > To: pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@g
Re: Why this example does not save anything to file?
Doesn't it depends on 'sink.parallelism'? If I set 'sink.parallelism' = '2' I get two files, 'sink.parallelism' = '1' just one file... But I think doing like that I reduce the number of tasks so it will have negative impact on performance :-( Sent: Tuesday, August 30, 2022 at 3:22 PM From: "Martijn Visser" To: pod...@gmx.com, user@flink.apache.org Subject: Re: Why this example does not save anything to file? Hi Mike, I think that's caused by you not having enabled checkpointing. If you enable that, it should be resolved I think. Best regards, Martijn On Wed, Aug 3, 2022 at 9:01 PM mailto:pod...@gmx.com]> wrote: Thank you very much Martijn you dedicated your productive time to help me! I'm new noob in this subject - I took that example somewhere from Internet. I see problem for guys like me is that Flink syntax changes from version to version quite significantly. So here not 'connector.type' but 'connector' etc. Additional problem was that there was no error that something is wrong and in addition 'select from' in next lines display result from table... Anyway, I was expecting single file 'test5.txt' as a result but got file for each row. part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-0-file-0 part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-1-file-0 ... Can it be just one file? Best, Mike Sent: Wednesday, August 03, 2022 at 4:03 PM From: "Martijn Visser" mailto:martijnvis...@apache.org]> To: pod...@gmx.com[mailto:pod...@gmx.com] Cc: user@flink.apache.org[mailto:user@flink.apache.org] Subject: Re: Why this example does not save anything to file? I've verified your code locally and it doesn't work indeed, at least not with the latest Flink version (I've tested it with Flink 1.15). There are a couple of reasons for that: 1. You've mentioned in this thread that there's no problem with the 'csv.field-delimiter'. There is actually, because the default is a , and not a ; as documented at https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options] 2. When adding this option, Flink wouldn't compile because the SQL statement uses options that are different then documented at https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/][https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5D]. You have connector.type, connector.path and format.type listed. It should be connector, path and format. In the end, I used the following code and the expect result was properly written: tEnv.executeSql( "CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector' = 'filesystem', 'path' = 'file:///C:/temp/test4.txt', 'format' = 'csv', 'csv.field-delimiter' = ';')"); Best regards, Martijn Op di 2 aug. 2022 om 00:42 schreef mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>: No, I do not have it Sent: Monday, August 01, 2022 at 4:43 PM From: "Martijn Visser" mailto:martijnvis...@apache.org][mailto:martijnvis...@apache.org[mailto:martijnvis...@apache.org]]> To: pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]] Cc: user@flink.apache.org[mailto:user@flink.apache.org][mailto:user@flink.apache.org[mailto:user@flink.apache.org]] Subject: Re: Why this example does not save anything to file? That's Flink fault-tolerance mechanism, see https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/[https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/][https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5Bhttps://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5D] Op ma 1 aug. 2022 om 16:37 schreef mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>: What's that? Sent: Monday, August 01, 2022 at 2:49 PM From: "Martijn Visser" mailto:martijnvis...@apache.org][mailto:martijnvis...@apache.org[mailto:martijnvis...@apache.org]]> To: pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]] Cc: user@flink.apache.org[mailto:user@flink.apache.org][mailto:user@flink.apache.org[mailto:user@flink.apache.org]] Subject: Re: Why this example does not save anything
Re: Why this example does not save anything to file?
Hi Mike, I think that's caused by you not having enabled checkpointing. If you enable that, it should be resolved I think. Best regards, Martijn On Wed, Aug 3, 2022 at 9:01 PM wrote: > > Thank you very much Martijn you dedicated your productive time to help me! > I'm new noob in this subject - I took that example somewhere from > Internet. I see problem for guys like me is that Flink syntax changes from > version to version quite significantly. So here not 'connector.type' but > 'connector' etc. > > Additional problem was that there was no error that something is wrong and > in addition 'select from' in next lines display result from table... > > Anyway, I was expecting single file 'test5.txt' as a result but got file > for each row. > > part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-0-file-0 > part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-1-file-0 > ... > > > Can it be just one file? > Best, > > Mike > > > Sent: Wednesday, August 03, 2022 at 4:03 PM > From: "Martijn Visser" > To: pod...@gmx.com > Cc: user@flink.apache.org > Subject: Re: Why this example does not save anything to file? > > I've verified your code locally and it doesn't work indeed, at least not > with the latest Flink version (I've tested it with Flink 1.15). There are a > couple of reasons for that: > > 1. You've mentioned in this thread that there's no problem with the > 'csv.field-delimiter'. There is actually, because the default is a , and > not a ; as documented at > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options > 2. When adding this option, Flink wouldn't compile because the SQL > statement uses options that are different then documented at > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/]. > You have connector.type, connector.path and format.type listed. It should > be connector, path and format. > > In the end, I used the following code and the expect result was properly > written: > > tEnv.executeSql( > "CREATE TABLE Table1 (column_name1 STRING, column_name2 > DOUBLE) WITH ('connector' = 'filesystem', 'path' = > 'file:///C:/temp/test4.txt', 'format' = 'csv', 'csv.field-delimiter' = > ';')"); > > Best regards, > > Martijn > > Op di 2 aug. 2022 om 00:42 schreef mailto:pod...@gmx.com > ]>: > > No, I do not have it > > > > Sent: Monday, August 01, 2022 at 4:43 PM > From: "Martijn Visser" martijnvis...@apache.org]> > To: pod...@gmx.com[mailto:pod...@gmx.com] > Cc: user@flink.apache.org[mailto:user@flink.apache.org] > Subject: Re: Why this example does not save anything to file? > > That's Flink fault-tolerance mechanism, see > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/[https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/] > > > Op ma 1 aug. 2022 om 16:37 schreef mailto:pod...@gmx.com > ]>: > > What's that? > > > > Sent: Monday, August 01, 2022 at 2:49 PM > From: "Martijn Visser" martijnvis...@apache.org]> > To: pod...@gmx.com[mailto:pod...@gmx.com] > Cc: user@flink.apache.org[mailto:user@flink.apache.org] > Subject: Re: Why this example does not save anything to file? > > Do you have checkpointing enabled? > > > Op za 30 jul. 2022 om 17:31 schreef mailto:pod...@gmx.com > ]>: > > Thanks David but there's no problem with that (probably ";" is default > separator). > I can read the file and insert into "Table1" (I said that in my mail). > Problem is to save to CSV. > > > > Sent: Saturday, July 30, 2022 at 3:33 PM > From: "David Anderson" mailto:dander...@apache.org]> > To: pod...@gmx.com[mailto:pod...@gmx.com] > Cc: "user" mailto:user@flink.apache.org]> > Subject: Re: Why this example does not save anything to file? > > You need to add > > 'csv.field-delimiter'=';' > > to the definition of Table1 so that the input from test4.txt can be > correctly parsed: > tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, > column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', > 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = > 'csv', &
Re: Why this example does not save anything to file?
Thank you very much Martijn you dedicated your productive time to help me! I'm new noob in this subject - I took that example somewhere from Internet. I see problem for guys like me is that Flink syntax changes from version to version quite significantly. So here not 'connector.type' but 'connector' etc. Additional problem was that there was no error that something is wrong and in addition 'select from' in next lines display result from table... Anyway, I was expecting single file 'test5.txt' as a result but got file for each row. part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-0-file-0 part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-1-file-0 ... Can it be just one file? Best, Mike Sent: Wednesday, August 03, 2022 at 4:03 PM From: "Martijn Visser" To: pod...@gmx.com Cc: user@flink.apache.org Subject: Re: Why this example does not save anything to file? I've verified your code locally and it doesn't work indeed, at least not with the latest Flink version (I've tested it with Flink 1.15). There are a couple of reasons for that: 1. You've mentioned in this thread that there's no problem with the 'csv.field-delimiter'. There is actually, because the default is a , and not a ; as documented at https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options 2. When adding this option, Flink wouldn't compile because the SQL statement uses options that are different then documented at https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/]. You have connector.type, connector.path and format.type listed. It should be connector, path and format. In the end, I used the following code and the expect result was properly written: tEnv.executeSql( "CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector' = 'filesystem', 'path' = 'file:///C:/temp/test4.txt', 'format' = 'csv', 'csv.field-delimiter' = ';')"); Best regards, Martijn Op di 2 aug. 2022 om 00:42 schreef mailto:pod...@gmx.com]>: No, I do not have it Sent: Monday, August 01, 2022 at 4:43 PM From: "Martijn Visser" mailto:martijnvis...@apache.org]> To: pod...@gmx.com[mailto:pod...@gmx.com] Cc: user@flink.apache.org[mailto:user@flink.apache.org] Subject: Re: Why this example does not save anything to file? That's Flink fault-tolerance mechanism, see https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/[https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/] Op ma 1 aug. 2022 om 16:37 schreef mailto:pod...@gmx.com]>: What's that? Sent: Monday, August 01, 2022 at 2:49 PM From: "Martijn Visser" mailto:martijnvis...@apache.org]> To: pod...@gmx.com[mailto:pod...@gmx.com] Cc: user@flink.apache.org[mailto:user@flink.apache.org] Subject: Re: Why this example does not save anything to file? Do you have checkpointing enabled? Op za 30 jul. 2022 om 17:31 schreef mailto:pod...@gmx.com]>: Thanks David but there's no problem with that (probably ";" is default separator). I can read the file and insert into "Table1" (I said that in my mail). Problem is to save to CSV. Sent: Saturday, July 30, 2022 at 3:33 PM From: "David Anderson" mailto:dander...@apache.org]> To: pod...@gmx.com[mailto:pod...@gmx.com] Cc: "user" mailto:user@flink.apache.org]> Subject: Re: Why this example does not save anything to file? You need to add 'csv.field-delimiter'=';' to the definition of Table1 so that the input from test4.txt can be correctly parsed: tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv', 'csv.field-delimiter'=';')"); Cheers, David On Fri, Jul 29, 2022 at 4:15 PM mailto:pod...@gmx.com]> wrote: Hi, you mean adding: " 'csv.field-delimiter'=';', " like: tEnv.executeSql("CREATE TABLE fs_table (" + " column_nameA STRING, " + " column_nameB DOUBLE " + " ) WITH ( " + " 'connector'='filesystem', " + " 'path'='file:///C:/temp/test5.txt', " + " 'format'='csv', " + " '
Re: Why this example does not save anything to file?
I've verified your code locally and it doesn't work indeed, at least not with the latest Flink version (I've tested it with Flink 1.15). There are a couple of reasons for that: 1. You've mentioned in this thread that there's no problem with the 'csv.field-delimiter'. There is actually, because the default is a , and not a ; as documented at https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options 2. When adding this option, Flink wouldn't compile because the SQL statement uses options that are different then documented at https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/. You have connector.type, connector.path and format.type listed. It should be connector, path and format. In the end, I used the following code and the expect result was properly written: tEnv.executeSql( "CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector' = 'filesystem', 'path' = 'file:///C:/temp/test4.txt', 'format' = 'csv', 'csv.field-delimiter' = ';')"); Best regards, Martijn Op di 2 aug. 2022 om 00:42 schreef : > No, I do not have it > > > *Sent:* Monday, August 01, 2022 at 4:43 PM > *From:* "Martijn Visser" > *To:* pod...@gmx.com > *Cc:* user@flink.apache.org > *Subject:* Re: Why this example does not save anything to file? > That's Flink fault-tolerance mechanism, see > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/ > > Op ma 1 aug. 2022 om 16:37 schreef : > >> What's that? >> >> >> *Sent:* Monday, August 01, 2022 at 2:49 PM >> *From:* "Martijn Visser" >> *To:* pod...@gmx.com >> *Cc:* user@flink.apache.org >> *Subject:* Re: Why this example does not save anything to file? >> Do you have checkpointing enabled? >> >> Op za 30 jul. 2022 om 17:31 schreef : >> >>> Thanks David but there's no problem with that (probably ";" is default >>> separator). >>> I can read the file and insert into "Table1" (I said that in my mail). >>> Problem is to save to CSV. >>> >>> >>> *Sent:* Saturday, July 30, 2022 at 3:33 PM >>> *From:* "David Anderson" >>> *To:* pod...@gmx.com >>> *Cc:* "user" >>> *Subject:* Re: Why this example does not save anything to file? >>> You need to add >>> >>> 'csv.field-delimiter'=';' >>> >>> to the definition of Table1 so that the input from test4.txt can be >>> correctly parsed: >>> >>> tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, >>> column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', >>> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv', >>> 'csv.field-delimiter'=';')"); >>> >>> Cheers, >>> David >>> >>> On Fri, Jul 29, 2022 at 4:15 PM wrote: >>> >>>> Hi, >>>> >>>> you mean adding: >>>> >>>> " 'csv.field-delimiter'=';', " >>>> >>>> like: >>>> >>>> tEnv.executeSql("CREATE TABLE fs_table (" >>>> + " column_nameA STRING, " >>>> + "column_nameB DOUBLE " >>>> + ") WITH ( " >>>> + "'connector'='filesystem', " >>>> + "'path'='file:///C:/temp/test5.txt', " >>>> + "'format'='csv', " >>>> + " 'csv.field-delimiter'=';', " >>>> + " 'sink.partition-commit.delay'='1 s', " >>>> + " 'sink.partition-commit.policy.kind'='success-file'" >>>> + ")"); >>>> >>>> tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, >>>> column_name2 from Table1"); >>>> >>>> I did. Nothing new - still does not work. >>>> >>>> >>>> >>>> *Sent:* Tuesday, July 26, 2022 at 4:00 PM >>>> *From:* "Gil De Grove" >>>> *T
Re: Why this example does not save anything to file?
No, I do not have it Sent: Monday, August 01, 2022 at 4:43 PM From: "Martijn Visser" To: pod...@gmx.com Cc: user@flink.apache.org Subject: Re: Why this example does not save anything to file? That's Flink fault-tolerance mechanism, see https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/ Op ma 1 aug. 2022 om 16:37 schreef <pod...@gmx.com>: What's that? Sent: Monday, August 01, 2022 at 2:49 PM From: "Martijn Visser" <martijnvis...@apache.org> To: pod...@gmx.com Cc: user@flink.apache.org Subject: Re: Why this example does not save anything to file? Do you have checkpointing enabled? Op za 30 jul. 2022 om 17:31 schreef <pod...@gmx.com>: Thanks David but there's no problem with that (probably ";" is default separator). I can read the file and insert into "Table1" (I said that in my mail). Problem is to save to CSV. Sent: Saturday, July 30, 2022 at 3:33 PM From: "David Anderson" <dander...@apache.org> To: pod...@gmx.com Cc: "user" <user@flink.apache.org> Subject: Re: Why this example does not save anything to file? You need to add 'csv.field-delimiter'=';' to the definition of Table1 so that the input from test4.txt can be correctly parsed: tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv', 'csv.field-delimiter'=';')"); Cheers, David On Fri, Jul 29, 2022 at 4:15 PM <pod...@gmx.com> wrote: Hi, you mean adding: " 'csv.field-delimiter'=';', " like: tEnv.executeSql("CREATE TABLE fs_table (" + " column_nameA STRING, " + " column_nameB DOUBLE " + " ) WITH ( " + " 'connector'='filesystem', " + " 'path'='file:///C:/temp/test5.txt', " + " 'format'='csv', " + " 'csv.field-delimiter'=';', " + " 'sink.partition-commit.delay'='1 s', " + " 'sink.partition-commit.policy.kind'='success-file'" + " )"); tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1"); I did. Nothing new - still does not work. Sent: Tuesday, July 26, 2022 at 4:00 PM From: "Gil De Grove" <gil.degr...@euranova.eu> To: "Weihua Hu" <huweihua@gmail.com> Cc: pod...@gmx.com, "user" <user@flink.apache.org> Subject: Re: Why this example does not save anything to file? Hello, I may be really wrong with this, but from what I get in the source file, you are using a semi-column to separate the value. This probably means that you should set the csv.field-delimiter to `;` to make your example work properly. Have you tried with that configuration in your create table csv connector option? Regards, Gil On Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua@gmail.com> wrote: Hi, Can you see any exception logs? Where is this code running? is it a standalone cluster with one TaskManager? Best, Weihua On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com> wrote: If I get it correctly this is the way how I can save to CSV: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example So my code is (read from file, save to file): package flinkCSV; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; public class flinkCSV { public static void main(String[] args) throws Exception { //register and create table EnvironmentSettings settings = EnvironmentSettings .newInstance() //.inStreamingMode() .inBatchMode() .build(); final TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')"); tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM Table1") .execute() .print(); tEnv.executeSql("CREATE TABLE fs_table ("
Re: Why this example does not save anything to file?
That's Flink fault-tolerance mechanism, see https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/ Op ma 1 aug. 2022 om 16:37 schreef : > What's that? > > > *Sent:* Monday, August 01, 2022 at 2:49 PM > *From:* "Martijn Visser" > *To:* pod...@gmx.com > *Cc:* user@flink.apache.org > *Subject:* Re: Why this example does not save anything to file? > Do you have checkpointing enabled? > > Op za 30 jul. 2022 om 17:31 schreef : > >> Thanks David but there's no problem with that (probably ";" is default >> separator). >> I can read the file and insert into "Table1" (I said that in my mail). >> Problem is to save to CSV. >> >> >> *Sent:* Saturday, July 30, 2022 at 3:33 PM >> *From:* "David Anderson" >> *To:* pod...@gmx.com >> *Cc:* "user" >> *Subject:* Re: Why this example does not save anything to file? >> You need to add >> >> 'csv.field-delimiter'=';' >> >> to the definition of Table1 so that the input from test4.txt can be >> correctly parsed: >> >> tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, >> column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', >> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv', >> 'csv.field-delimiter'=';')"); >> >> Cheers, >> David >> >> On Fri, Jul 29, 2022 at 4:15 PM wrote: >> >>> Hi, >>> >>> you mean adding: >>> >>> " 'csv.field-delimiter'=';', " >>> >>> like: >>> >>> tEnv.executeSql("CREATE TABLE fs_table (" >>> + "column_nameA STRING, " >>> + "column_nameB DOUBLE " >>> + ") WITH ( " >>> + "'connector'='filesystem', " >>> + "'path'='file:///C:/temp/test5.txt', " >>> + "'format'='csv', " >>> + " 'csv.field-delimiter'=';', " >>> + " 'sink.partition-commit.delay'='1 s', " >>> + " 'sink.partition-commit.policy.kind'='success-file'" >>> + ")"); >>> >>> tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, >>> column_name2 from Table1"); >>> >>> I did. Nothing new - still does not work. >>> >>> >>> >>> *Sent:* Tuesday, July 26, 2022 at 4:00 PM >>> *From:* "Gil De Grove" >>> *To:* "Weihua Hu" >>> *Cc:* pod...@gmx.com, "user" >>> *Subject:* Re: Why this example does not save anything to file? >>> Hello, >>> >>> I may be really wrong with this, but from what I get in the source file, >>> you are using a semi-column to separate the value. >>> This probably means that you should set the csv.field-delimiter to `;` >>> to make your example work properly. >>> >>> Have you tried with that configuration in your create table csv >>> connector option? >>> >>> Regards, >>> Gil >>> >>> On Tue, 26 Jul 2022 at 15:40, Weihua Hu wrote: >>> >>>> Hi, >>>> >>>> Can you see any exception logs? >>>> Where is this code running? is it a standalone cluster with one >>>> TaskManager? >>>> >>>> >>>> Best, >>>> Weihua >>>> >>>> On Tue, Jul 26, 2022 at 4:18 AM wrote: >>>> >>>>> If I get it correctly this is the way how I can save to CSV: >>>>> >>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example >>>>> >>>>> So my code is (read from file, save to file): >>>>> >>>>> >>>>> *package flinkCSV;* >>>>> >>>>> *import org.apache.flink.table.api.EnvironmentSettings; import >>>>> org.apache.flink.table.api.TableEnvironment;* >>>>> *public class flinkCSV {* >>>>> *public static void main(String[] args) throws Exception {* >>>>> &g
Re: Why this example does not save anything to file?
What's that? Sent: Monday, August 01, 2022 at 2:49 PM From: "Martijn Visser" To: pod...@gmx.com Cc: user@flink.apache.org Subject: Re: Why this example does not save anything to file? Do you have checkpointing enabled? Op za 30 jul. 2022 om 17:31 schreef <pod...@gmx.com>: Thanks David but there's no problem with that (probably ";" is default separator). I can read the file and insert into "Table1" (I said that in my mail). Problem is to save to CSV. Sent: Saturday, July 30, 2022 at 3:33 PM From: "David Anderson" <dander...@apache.org> To: pod...@gmx.com Cc: "user" <user@flink.apache.org> Subject: Re: Why this example does not save anything to file? You need to add 'csv.field-delimiter'=';' to the definition of Table1 so that the input from test4.txt can be correctly parsed: tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv', 'csv.field-delimiter'=';')"); Cheers, David On Fri, Jul 29, 2022 at 4:15 PM <pod...@gmx.com> wrote: Hi, you mean adding: " 'csv.field-delimiter'=';', " like: tEnv.executeSql("CREATE TABLE fs_table (" + " column_nameA STRING, " + " column_nameB DOUBLE " + " ) WITH ( " + " 'connector'='filesystem', " + " 'path'='file:///C:/temp/test5.txt', " + " 'format'='csv', " + " 'csv.field-delimiter'=';', " + " 'sink.partition-commit.delay'='1 s', " + " 'sink.partition-commit.policy.kind'='success-file'" + " )"); tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1"); I did. Nothing new - still does not work. Sent: Tuesday, July 26, 2022 at 4:00 PM From: "Gil De Grove" <gil.degr...@euranova.eu> To: "Weihua Hu" <huweihua@gmail.com> Cc: pod...@gmx.com, "user" <user@flink.apache.org> Subject: Re: Why this example does not save anything to file? Hello, I may be really wrong with this, but from what I get in the source file, you are using a semi-column to separate the value. This probably means that you should set the csv.field-delimiter to `;` to make your example work properly. Have you tried with that configuration in your create table csv connector option? Regards, Gil On Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua@gmail.com> wrote: Hi, Can you see any exception logs? Where is this code running? is it a standalone cluster with one TaskManager? Best, Weihua On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com> wrote: If I get it correctly this is the way how I can save to CSV: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example So my code is (read from file, save to file): package flinkCSV; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; public class flinkCSV { public static void main(String[] args) throws Exception { //register and create table EnvironmentSettings settings = EnvironmentSettings .newInstance() //.inStreamingMode() .inBatchMode() .build(); final TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')"); tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM Table1") .execute() .print(); tEnv.executeSql("CREATE TABLE fs_table (" + " column_nameA STRING, " + " column_nameB DOUBLE " + " ) WITH ( \n" + " 'connector'='filesystem', " + " 'path'='file:///C:/temp/test5.txt', " + " 'format'='csv', " + " 'sink.partition-commit.delay'='1 s', " + " 'sink.partition-commit.policy.kind'='success-file'" + " )"); tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1"); tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table") .execute() .print(); } } Source file (test4.txt) is: aa; 23 bb; 657.9 cc; 55 test5.txt is not created, select from fs_table gives null
Re: Why this example does not save anything to file?
Do you have checkpointing enabled? Op za 30 jul. 2022 om 17:31 schreef : > Thanks David but there's no problem with that (probably ";" is default > separator). > I can read the file and insert into "Table1" (I said that in my mail). > Problem is to save to CSV. > > > *Sent:* Saturday, July 30, 2022 at 3:33 PM > *From:* "David Anderson" > *To:* pod...@gmx.com > *Cc:* "user" > *Subject:* Re: Why this example does not save anything to file? > You need to add > > 'csv.field-delimiter'=';' > > to the definition of Table1 so that the input from test4.txt can be > correctly parsed: > > tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, > column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', > 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv', > 'csv.field-delimiter'=';')"); > > Cheers, > David > > On Fri, Jul 29, 2022 at 4:15 PM wrote: > >> Hi, >> >> you mean adding: >> >> " 'csv.field-delimiter'=';', " >> >> like: >> >> tEnv.executeSql("CREATE TABLE fs_table (" >> + "column_nameA STRING, " >> + "column_nameB DOUBLE " >> + ") WITH ( " >> + "'connector'='filesystem', " >> + "'path'='file:///C:/temp/test5.txt', " >> + "'format'='csv', " >> + " 'csv.field-delimiter'=';', " >> + " 'sink.partition-commit.delay'='1 s', " >> + " 'sink.partition-commit.policy.kind'='success-file'" >> + ")"); >> >> tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, >> column_name2 from Table1"); >> >> I did. Nothing new - still does not work. >> >> >> >> *Sent:* Tuesday, July 26, 2022 at 4:00 PM >> *From:* "Gil De Grove" >> *To:* "Weihua Hu" >> *Cc:* pod...@gmx.com, "user" >> *Subject:* Re: Why this example does not save anything to file? >> Hello, >> >> I may be really wrong with this, but from what I get in the source file, >> you are using a semi-column to separate the value. >> This probably means that you should set the csv.field-delimiter to `;` to >> make your example work properly. >> >> Have you tried with that configuration in your create table csv connector >> option? >> >> Regards, >> Gil >> >> On Tue, 26 Jul 2022 at 15:40, Weihua Hu wrote: >> >>> Hi, >>> >>> Can you see any exception logs? >>> Where is this code running? is it a standalone cluster with one >>> TaskManager? >>> >>> >>> Best, >>> Weihua >>> >>> On Tue, Jul 26, 2022 at 4:18 AM wrote: >>> >>>> If I get it correctly this is the way how I can save to CSV: >>>> >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example >>>> >>>> So my code is (read from file, save to file): >>>> >>>> >>>> *package flinkCSV;* >>>> >>>> *import org.apache.flink.table.api.EnvironmentSettings; import >>>> org.apache.flink.table.api.TableEnvironment;* >>>> *public class flinkCSV {* >>>> *public static void main(String[] args) throws Exception {* >>>> >>>> >>>> >>>> >>>> >>>> >>>> * //register and create table >>>> EnvironmentSettings settings = EnvironmentSettings >>>> .newInstance() //.inStreamingMode() >>>> .inBatchMode() .build();* >>>> *final TableEnvironment tEnv = >>>> TableEnvironment.create(settings);* >>>> >>>> >>>> >>>> >>>> >>>> >>>> * tEnv.executeSql("CREATE TABLE Table1 (column_name1 >>>> STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', >>>> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')"); >>>> tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM >>>> Table1") .execute() .print(); * >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> *tEnv.executeSql("CREATE TABLE fs_table (" + >>>> "column_nameA STRING, " + "column_nameB DOUBLE " >>>> + ") WITH ( \n" + " >>>> 'connector'='filesystem', " + " >>>> 'path'='file:///C:/temp/test5.txt', " + " >>>> 'format'='csv', " + " 'sink.partition-commit.delay'='1 >>>> s', " + " >>>> 'sink.partition-commit.policy.kind'='success-file'" + " >>>> )"); tEnv.executeSql("INSERT INTO fs_table SELECT >>>> column_name1, column_name2 from Table1"); >>>> tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table") >>>> .execute() .print(); } }* >>>> >>>> Source file (test4.txt) is: >>>> >>>> aa; 23 >>>> bb; 657.9 >>>> cc; 55 >>>> >>>> test5.txt is not created, select from fs_table gives null >>>> >>>> >>>
Re: Why this example does not save anything to file?
Thanks David but there's no problem with that (probably ";" is default separator). I can read the file and insert into "Table1" (I said that in my mail). Problem is to save to CSV. Sent: Saturday, July 30, 2022 at 3:33 PM From: "David Anderson" To: pod...@gmx.com Cc: "user" Subject: Re: Why this example does not save anything to file? You need to add 'csv.field-delimiter'=';' to the definition of Table1 so that the input from test4.txt can be correctly parsed: tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv', 'csv.field-delimiter'=';')"); Cheers, David On Fri, Jul 29, 2022 at 4:15 PM <pod...@gmx.com> wrote: Hi, you mean adding: " 'csv.field-delimiter'=';', " like: tEnv.executeSql("CREATE TABLE fs_table (" + " column_nameA STRING, " + " column_nameB DOUBLE " + " ) WITH ( " + " 'connector'='filesystem', " + " 'path'='file:///C:/temp/test5.txt', " + " 'format'='csv', " + " 'csv.field-delimiter'=';', " + " 'sink.partition-commit.delay'='1 s', " + " 'sink.partition-commit.policy.kind'='success-file'" + " )"); tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1"); I did. Nothing new - still does not work. Sent: Tuesday, July 26, 2022 at 4:00 PM From: "Gil De Grove" <gil.degr...@euranova.eu> To: "Weihua Hu" <huweihua@gmail.com> Cc: pod...@gmx.com, "user" <user@flink.apache.org> Subject: Re: Why this example does not save anything to file? Hello, I may be really wrong with this, but from what I get in the source file, you are using a semi-column to separate the value. This probably means that you should set the csv.field-delimiter to `;` to make your example work properly. Have you tried with that configuration in your create table csv connector option? Regards, Gil On Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua@gmail.com> wrote: Hi, Can you see any exception logs? Where is this code running? is it a standalone cluster with one TaskManager? Best, Weihua On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com> wrote: If I get it correctly this is the way how I can save to CSV: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example So my code is (read from file, save to file): package flinkCSV; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; public class flinkCSV { public static void main(String[] args) throws Exception { //register and create table EnvironmentSettings settings = EnvironmentSettings .newInstance() //.inStreamingMode() .inBatchMode() .build(); final TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')"); tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM Table1") .execute() .print(); tEnv.executeSql("CREATE TABLE fs_table (" + " column_nameA STRING, " + " column_nameB DOUBLE " + " ) WITH ( \n" + " 'connector'='filesystem', " + " 'path'='file:///C:/temp/test5.txt', " + " 'format'='csv', " + " 'sink.partition-commit.delay'='1 s', " + " 'sink.partition-commit.policy.kind'='success-file'" + " )"); tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1"); tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table") .execute() .print(); } } Source file (test4.txt) is: aa; 23 bb; 657.9 cc; 55 test5.txt is not created, select from fs_table gives null
Re: Why this example does not save anything to file?
You need to add 'csv.field-delimiter'=';' to the definition of Table1 so that the input from test4.txt can be correctly parsed: tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv', 'csv.field-delimiter'=';')"); Cheers, David On Fri, Jul 29, 2022 at 4:15 PM wrote: > Hi, > > you mean adding: > > " 'csv.field-delimiter'=';', " > > like: > > tEnv.executeSql("CREATE TABLE fs_table (" > + "column_nameA STRING, " > + "column_nameB DOUBLE " > + ") WITH ( " > + "'connector'='filesystem', " > + "'path'='file:///C:/temp/test5.txt', " > + "'format'='csv', " > + " 'csv.field-delimiter'=';', " > + " 'sink.partition-commit.delay'='1 s', " > + " 'sink.partition-commit.policy.kind'='success-file'" > + ")"); > > tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, > column_name2 from Table1"); > > I did. Nothing new - still does not work. > > > > *Sent:* Tuesday, July 26, 2022 at 4:00 PM > *From:* "Gil De Grove" > *To:* "Weihua Hu" > *Cc:* pod...@gmx.com, "user" > *Subject:* Re: Why this example does not save anything to file? > Hello, > > I may be really wrong with this, but from what I get in the source file, > you are using a semi-column to separate the value. > This probably means that you should set the csv.field-delimiter to `;` to > make your example work properly. > > Have you tried with that configuration in your create table csv connector > option? > > Regards, > Gil > > On Tue, 26 Jul 2022 at 15:40, Weihua Hu wrote: > >> Hi, >> >> Can you see any exception logs? >> Where is this code running? is it a standalone cluster with one >> TaskManager? >> >> >> Best, >> Weihua >> >> On Tue, Jul 26, 2022 at 4:18 AM wrote: >> >>> If I get it correctly this is the way how I can save to CSV: >>> >>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example >>> >>> So my code is (read from file, save to file): >>> >>> >>> *package flinkCSV;* >>> >>> *import org.apache.flink.table.api.EnvironmentSettings; import >>> org.apache.flink.table.api.TableEnvironment;* >>> *public class flinkCSV {* >>> *public static void main(String[] args) throws Exception {* >>> >>> >>> >>> >>> >>> >>> * //register and create table >>> EnvironmentSettings settings = EnvironmentSettings >>> .newInstance() //.inStreamingMode() >>> .inBatchMode() .build();* >>> *final TableEnvironment tEnv = >>> TableEnvironment.create(settings);* >>> >>> >>> >>> >>> >>> >>> * tEnv.executeSql("CREATE TABLE Table1 (column_name1 >>> STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', >>> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')"); >>> tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM >>> Table1") .execute() .print(); * >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> *tEnv.executeSql("CREATE TABLE fs_table (" + " >>> column_nameA STRING, " + "column_nameB DOUBLE " >>> + ") WITH ( \n" + " >>> 'connector'='filesystem', " + " >>> 'path'='file:///C:/temp/test5.txt', " + " >>> 'format'='csv', " + " 'sink.partition-commit.delay'='1 >>> s', " + " >>> 'sink.partition-commit.policy.kind'='success-file'" + " >>> )"); tEnv.executeSql("INSERT INTO fs_table SELECT >>> column_name1, column_name2 from Table1"); >>> tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table") >>> .execute() .print(); } }* >>> >>> Source file (test4.txt) is: >>> >>> aa; 23 >>> bb; 657.9 >>> cc; 55 >>> >>> test5.txt is not created, select from fs_table gives null >>> >>> >>
Re: Why this example does not save anything to file?
Hi, you mean adding: " 'csv.field-delimiter'=';', " like: tEnv.executeSql("CREATE TABLE fs_table (" + " column_nameA STRING, " + " column_nameB DOUBLE " + " ) WITH ( " + " 'connector'='filesystem', " + " 'path'='file:///C:/temp/test5.txt', " + " 'format'='csv', " + " 'csv.field-delimiter'=';', " + " 'sink.partition-commit.delay'='1 s', " + " 'sink.partition-commit.policy.kind'='success-file'" + " )"); tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1"); I did. Nothing new - still does not work. Sent: Tuesday, July 26, 2022 at 4:00 PM From: "Gil De Grove" To: "Weihua Hu" Cc: pod...@gmx.com, "user" Subject: Re: Why this example does not save anything to file? Hello, I may be really wrong with this, but from what I get in the source file, you are using a semi-column to separate the value. This probably means that you should set the csv.field-delimiter to `;` to make your example work properly. Have you tried with that configuration in your create table csv connector option? Regards, Gil On Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua@gmail.com> wrote: Hi, Can you see any exception logs? Where is this code running? is it a standalone cluster with one TaskManager? Best, Weihua On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com> wrote: If I get it correctly this is the way how I can save to CSV: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example So my code is (read from file, save to file): package flinkCSV; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; public class flinkCSV { public static void main(String[] args) throws Exception { //register and create table EnvironmentSettings settings = EnvironmentSettings .newInstance() //.inStreamingMode() .inBatchMode() .build(); final TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')"); tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM Table1") .execute() .print(); tEnv.executeSql("CREATE TABLE fs_table (" + " column_nameA STRING, " + " column_nameB DOUBLE " + " ) WITH ( \n" + " 'connector'='filesystem', " + " 'path'='file:///C:/temp/test5.txt', " + " 'format'='csv', " + " 'sink.partition-commit.delay'='1 s', " + " 'sink.partition-commit.policy.kind'='success-file'" + " )"); tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1"); tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table") .execute() .print(); } } Source file (test4.txt) is: aa; 23 bb; 657.9 cc; 55 test5.txt is not created, select from fs_table gives null
Re: Why this example does not save anything to file?
In Eclipse only: WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/C:/Flink/flink-1.14.4/lib/flink-dist_2.11-1.14.4.jar) to field java.lang.String.value WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release In Flink logs: 2022-07-29 22:04:15,835 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit does not contain a setter for field modificationTime 2022-07-29 22:04:15,836 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. 2022-07-29 22:04:15,949 INFO org.apache.flink.client.program.rest.RestClusterClient [] - Submitting job 'insert-into_default_catalog.default_database.fs_table' (064e2e154028411712786c61a79221bf). I do not think anything is wrong with these fields - they are correctly declared. I see similar issue: https://issues.apache.org/jira/browse/FLINK-12410 This is a bug? Sent: Tuesday, July 26, 2022 at 3:40 PM From: "Weihua Hu" To: pod...@gmx.com Cc: "user" Subject: Re: Why this example does not save anything to file? Hi, Can you see any exception logs? Where is this code running? is it a standalone cluster with one TaskManager? Best, Weihua On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com> wrote: If I get it correctly this is the way how I can save to CSV: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example So my code is (read from file, save to file): package flinkCSV; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; public class flinkCSV { public static void main(String[] args) throws Exception { //register and create table EnvironmentSettings settings = EnvironmentSettings .newInstance() //.inStreamingMode() .inBatchMode() .build(); final TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')"); tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM Table1") .execute() .print(); tEnv.executeSql("CREATE TABLE fs_table (" + " column_nameA STRING, " + " column_nameB DOUBLE " + " ) WITH ( \n" + " 'connector'='filesystem', " + " 'path'='file:///C:/temp/test5.txt', " + " 'format'='csv', " + " 'sink.partition-commit.delay'='1 s', " + " 'sink.partition-commit.policy.kind'='success-file'" + " )"); tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1"); tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table") .execute() .print(); } } Source file (test4.txt) is: aa; 23 bb; 657.9 cc; 55 test5.txt is not created, select from fs_table gives null
Re: Why this example does not save anything to file?
Hello, I may be really wrong with this, but from what I get in the source file, you are using a semi-column to separate the value. This probably means that you should set the csv.field-delimiter to `;` to make your example work properly. Have you tried with that configuration in your create table csv connector option? Regards, Gil On Tue, 26 Jul 2022 at 15:40, Weihua Hu wrote: > Hi, > > Can you see any exception logs? > Where is this code running? is it a standalone cluster with one > TaskManager? > > > Best, > Weihua > > > On Tue, Jul 26, 2022 at 4:18 AM wrote: > >> If I get it correctly this is the way how I can save to CSV: >> >> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example >> >> So my code is (read from file, save to file): >> >> >> *package flinkCSV;* >> >> *import org.apache.flink.table.api.EnvironmentSettings; import >> org.apache.flink.table.api.TableEnvironment;* >> *public class flinkCSV {* >> *public static void main(String[] args) throws Exception {* >> >> >> >> >> >> >> * //register and create table EnvironmentSettings >> settings = EnvironmentSettings .newInstance() >> //.inStreamingMode() .inBatchMode() >> .build();* >> *final TableEnvironment tEnv = TableEnvironment.create(settings);* >> >> >> >> >> >> >> * tEnv.executeSql("CREATE TABLE Table1 (column_name1 >> STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', >> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')"); >> tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM >> Table1") .execute() .print(); * >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> *tEnv.executeSql("CREATE TABLE fs_table (" + " >> column_nameA STRING, " + "column_nameB DOUBLE " >> + ") WITH ( \n" + " >> 'connector'='filesystem', " + " >> 'path'='file:///C:/temp/test5.txt', " + " >> 'format'='csv', " + " 'sink.partition-commit.delay'='1 >> s', " + " >> 'sink.partition-commit.policy.kind'='success-file'" + " >> )"); tEnv.executeSql("INSERT INTO fs_table SELECT >> column_name1, column_name2 from Table1"); >> tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table") >> .execute() .print(); } }* >> >> Source file (test4.txt) is: >> >> aa; 23 >> bb; 657.9 >> cc; 55 >> >> test5.txt is not created, select from fs_table gives null >> >> >
Re: Why this example does not save anything to file?
Hi, Can you see any exception logs? Where is this code running? is it a standalone cluster with one TaskManager? Best, Weihua On Tue, Jul 26, 2022 at 4:18 AM wrote: > If I get it correctly this is the way how I can save to CSV: > > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example > > So my code is (read from file, save to file): > > > *package flinkCSV;* > > *import org.apache.flink.table.api.EnvironmentSettings; import > org.apache.flink.table.api.TableEnvironment;* > *public class flinkCSV {* > *public static void main(String[] args) throws Exception {* > > > > > > > * //register and create table EnvironmentSettings > settings = EnvironmentSettings .newInstance() > //.inStreamingMode() .inBatchMode() > .build();* > *final TableEnvironment tEnv = TableEnvironment.create(settings);* > > > > > > > * tEnv.executeSql("CREATE TABLE Table1 (column_name1 > STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', > 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')"); > tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM > Table1") .execute() .print(); * > > > > > > > > > > > > > > > > > > > *tEnv.executeSql("CREATE TABLE fs_table (" + " > column_nameA STRING, " + "column_nameB DOUBLE " > + ") WITH ( \n" + " > 'connector'='filesystem', " + " > 'path'='file:///C:/temp/test5.txt', " + " > 'format'='csv', " + " 'sink.partition-commit.delay'='1 > s', " + " > 'sink.partition-commit.policy.kind'='success-file'" + " > )"); tEnv.executeSql("INSERT INTO fs_table SELECT > column_name1, column_name2 from Table1"); > tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table") > .execute() .print(); } }* > > Source file (test4.txt) is: > > aa; 23 > bb; 657.9 > cc; 55 > > test5.txt is not created, select from fs_table gives null > >
Why this example does not save anything to file?
If I get it correctly this is the way how I can save to CSV: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example So my code is (read from file, save to file): package flinkCSV; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; public class flinkCSV { public static void main(String[] args) throws Exception { //register and create table EnvironmentSettings settings = EnvironmentSettings .newInstance() //.inStreamingMode() .inBatchMode() .build(); final TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')"); tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM Table1") .execute() .print(); tEnv.executeSql("CREATE TABLE fs_table (" + " column_nameA STRING, " + " column_nameB DOUBLE " + " ) WITH ( \n" + " 'connector'='filesystem', " + " 'path'='file:///C:/temp/test5.txt', " + " 'format'='csv', " + " 'sink.partition-commit.delay'='1 s', " + " 'sink.partition-commit.policy.kind'='success-file'" + " )"); tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1"); tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table") .execute() .print(); } } Source file (test4.txt) is: aa; 23 bb; 657.9 cc; 55 test5.txt is not created, select from fs_table gives null