Hi Mike,

I think that's caused by you not having enabled checkpointing. If you
enable that, it should be resolved I think.

Best regards,

Martijn

On Wed, Aug 3, 2022 at 9:01 PM <pod...@gmx.com> wrote:

>
> Thank you very much Martijn you dedicated your productive time to help me!
> I'm new noob in this subject - I took that example somewhere from
> Internet. I see problem for guys like me is that Flink syntax changes from
> version to version quite significantly. So here not 'connector.type' but
> 'connector' etc.
>
> Additional problem was that there was no error that something is wrong and
> in addition 'select from' in next lines display result from table...
>
> Anyway, I was expecting single file 'test5.txt' as a result but got file
> for each row.
>
> part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-0-file-0
> part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-1-file-0
> ...
>
>
> Can it be just one file?
> Best,
>
> Mike
>
>
> Sent: Wednesday, August 03, 2022 at 4:03 PM
> From: "Martijn Visser" <martijnvis...@apache.org>
> To: pod...@gmx.com
> Cc: user@flink.apache.org
> Subject: Re: Why this example does not save anything to file?
>
> I've verified your code locally and it doesn't work indeed, at least not
> with the latest Flink version (I've tested it with Flink 1.15). There are a
> couple of reasons for that:
>
> 1. You've mentioned in this thread that there's no problem with the
> 'csv.field-delimiter'. There is actually, because the default is a , and
> not a ; as documented at
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options
> 2. When adding this option, Flink wouldn't compile because the SQL
> statement uses options that are different then documented at
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/].
> You have connector.type, connector.path and format.type listed. It should
> be connector, path and format.
>
> In the end, I used the following code and the expect result was properly
> written:
>
>         tEnv.executeSql(
>                 "CREATE TABLE Table1 (column_name1 STRING, column_name2
> DOUBLE) WITH ('connector' = 'filesystem', 'path' =
> 'file:///C:/temp/test4.txt', 'format' = 'csv', 'csv.field-delimiter' =
> ';')");
>
> Best regards,
>
> Martijn
>
> Op di 2 aug. 2022 om 00:42 schreef <pod...@gmx.com[mailto:pod...@gmx.com
> ]>:
>
> No, I do not have it
>
>
>
> Sent: Monday, August 01, 2022 at 4:43 PM
> From: "Martijn Visser" <martijnvis...@apache.org[mailto:
> martijnvis...@apache.org]>
> To: pod...@gmx.com[mailto:pod...@gmx.com]
> Cc: user@flink.apache.org[mailto:user@flink.apache.org]
> Subject: Re: Why this example does not save anything to file?
>
> That's Flink fault-tolerance mechanism, see
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/[https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/]
>
>
> Op ma 1 aug. 2022 om 16:37 schreef <pod...@gmx.com[mailto:pod...@gmx.com
> ]>:
>
> What's that?
>
>
>
> Sent: Monday, August 01, 2022 at 2:49 PM
> From: "Martijn Visser" <martijnvis...@apache.org[mailto:
> martijnvis...@apache.org]>
> To: pod...@gmx.com[mailto:pod...@gmx.com]
> Cc: user@flink.apache.org[mailto:user@flink.apache.org]
> Subject: Re: Why this example does not save anything to file?
>
> Do you have checkpointing enabled?
>
>
> Op za 30 jul. 2022 om 17:31 schreef <pod...@gmx.com[mailto:pod...@gmx.com
> ]>:
>
> Thanks David but there's no problem with that (probably ";" is default
> separator).
> I can read the file and insert into "Table1" (I said that in my mail).
> Problem is to save to CSV.
>
>
>
> Sent: Saturday, July 30, 2022 at 3:33 PM
> From: "David Anderson" <dander...@apache.org[mailto:dander...@apache.org]>
> To: pod...@gmx.com[mailto:pod...@gmx.com]
> Cc: "user" <user@flink.apache.org[mailto:user@flink.apache.org]>
> Subject: Re: Why this example does not save anything to file?
>
> You need to add
>
> 'csv.field-delimiter'=';'
>
> to the definition of Table1 so that the input from test4.txt can be
> correctly parsed:
>          tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING,
> column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' =
> 'csv', 'csv.field-delimiter'=';')");
>
> Cheers,
> David
>
> On Fri, Jul 29, 2022 at 4:15 PM <pod...@gmx.com[mailto:pod...@gmx.com]>
> wrote:
>
> Hi,
>
> you mean adding:
>
> " 'csv.field-delimiter'=';', "
>
> like:
>
>         tEnv.executeSql("CREATE TABLE fs_table ("
>                 + "    column_nameA STRING, "
>                 + "    column_nameB DOUBLE "
>                 + "    ) WITH ( "
>                 + "    'connector'='filesystem', "
>                 + "    'path'='file:///C:/temp/test5.txt', "
>                 + "    'format'='csv', "
>                 + " 'csv.field-delimiter'=';', "
>                 + " 'sink.partition-commit.delay'='1 s', "
>                 + " 'sink.partition-commit.policy.kind'='success-file'"
>                 + "    )");
>
>         tEnv.executeSql("INSERT INTO fs_table SELECT column_name1,
> column_name2 from Table1");
>
> I did. Nothing new - still does not work.
>
>
>
>
> Sent: Tuesday, July 26, 2022 at 4:00 PM
> From: "Gil De Grove" <gil.degr...@euranova.eu[mailto:
> gil.degr...@euranova.eu]>
> To: "Weihua Hu" <huweihua....@gmail.com[mailto:huweihua....@gmail.com]>
> Cc: pod...@gmx.com[mailto:pod...@gmx.com], "user" <user@flink.apache.org
> [mailto:user@flink.apache.org]>
> Subject: Re: Why this example does not save anything to file?
>
> Hello,
>
> I may be really wrong with this, but from what I get in the source file,
> you are using a semi-column to separate the value.
> This probably means that you should set the csv.field-delimiter to `;` to
> make your example work properly.
>
> Have you tried with that configuration in your create table csv connector
> option?
>
> Regards,
> Gil
>
> On Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua....@gmail.com[mailto:
> huweihua....@gmail.com]> wrote:
> Hi,
>
>
> Can you see any exception logs?
> Where is this code running? is it a standalone cluster with one
> TaskManager?
>
>
>
> Best,
> Weihua
>
> On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com[mailto:pod...@gmx.com]>
> wrote:
>
> If I get it correctly this is the way how I can save to CSV:
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example]
>
> So my code is (read from file, save to file):
>
>
>
> package flinkCSV;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.TableEnvironment;
> public class flinkCSV {
>     public static void main(String[] args) throws Exception {
>
>         //register and create table
>         EnvironmentSettings settings = EnvironmentSettings
>                 .newInstance()
>                 //.inStreamingMode()
>                 .inBatchMode()
>                 .build();
>         final TableEnvironment tEnv = TableEnvironment.create(settings);
>
>         tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING,
> column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')");
>
>         tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM Table1")
>         .execute()
>         .print();
>
>         tEnv.executeSql("CREATE TABLE fs_table ("
>                 + "    column_nameA STRING, "
>                 + "    column_nameB DOUBLE "
>                 + "    ) WITH ( \n"
>                 + "    'connector'='filesystem', "
>                 + "    'path'='file:///C:/temp/test5.txt', "
>                 + "    'format'='csv', "
>                 + "  'sink.partition-commit.delay'='1 s', "
>                 + "  'sink.partition-commit.policy.kind'='success-file'"
>                 + "    )");
>
>         tEnv.executeSql("INSERT INTO fs_table SELECT column_name1,
> column_name2 from Table1");
>
>         tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table")
>         .execute()
>         .print();
>
>      }
> }
>
> Source file (test4.txt) is:
>
> aa; 23
> bb; 657.9
> cc; 55
>
> test5.txt is not created, select from fs_table gives null
>
>

Reply via email to