Re: Why this example does not save anything to file?

2022-09-01 Thread podunk
Thank you.
So I understand 'this is normal Flink property - live with it' :-)
 
M.
 

Sent: Wednesday, August 31, 2022 at 9:37 PM
From: "David Anderson" 
To: pod...@gmx.com
Cc: "user" 
Subject: Re: Why this example does not save anything to file?

With the parallelism set to 2, you will get 2 files. More than 2, actually -- 
e.g., with hourly buckets, you'll get 2 files per hour.
 
If the i/o bandwidth of a single instance is sufficient to handle your expected 
throughput, then you can set the parallelism of the sink (or the entire 
pipeline) to one. If it's not, then you'll be glad to have multiple instances 
each handling a slice of the job and writing to independent filesystems. 

 
David 

On Wed, Aug 31, 2022 at 8:44 AM mailto:pod...@gmx.com]> wrote:
Doesn't it depends on 'sink.parallelism'?
If I set 'sink.parallelism' = '2' I get two files, 'sink.parallelism' = '1' 
just one file...
 
But I think doing like that I reduce the number of tasks so it will have 
negative impact on performance :-(
 
 

Sent: Tuesday, August 30, 2022 at 3:22 PM
From: "Martijn Visser" mailto:mart...@immerok.com]>
To: pod...@gmx.com[mailto:pod...@gmx.com], 
user@flink.apache.org[mailto:user@flink.apache.org]
Subject: Re: Why this example does not save anything to file?

Hi Mike,
 
I think that's caused by you not having enabled checkpointing. If you enable 
that, it should be resolved I think.
 
Best regards,
 
Martijn 

On Wed, Aug 3, 2022 at 9:01 PM 
mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>
 wrote:
Thank you very much Martijn you dedicated your productive time to help me!
I'm new noob in this subject - I took that example somewhere from Internet. I 
see problem for guys like me is that Flink syntax changes from version to 
version quite significantly. So here not 'connector.type' but 'connector' etc.
 
Additional problem was that there was no error that something is wrong and in 
addition 'select from' in next lines display result from table...
 
Anyway, I was expecting single file 'test5.txt' as a result but got file for 
each row.
 
part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-0-file-0
part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-1-file-0
...
 

Can it be just one file?
Best,
 
Mike
 

Sent: Wednesday, August 03, 2022 at 4:03 PM
From: "Martijn Visser" 
mailto:martijnvis...@apache.org][mailto:martijnvis...@apache.org[mailto:martijnvis...@apache.org]]>
To: 
pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]
Cc: 
user@flink.apache.org[mailto:user@flink.apache.org][mailto:user@flink.apache.org[mailto:user@flink.apache.org]]
Subject: Re: Why this example does not save anything to file?

I've verified your code locally and it doesn't work indeed, at least not with 
the latest Flink version (I've tested it with Flink 1.15). There are a couple 
of reasons for that:
 
1. You've mentioned in this thread that there's no problem with the 
'csv.field-delimiter'. There is actually, because the default is a , and not a 
; as documented at 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options][https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/%23format-options]]
2. When adding this option, Flink wouldn't compile because the SQL statement 
uses options that are different then documented at 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/][https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5D][https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5D%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5D%5D].
 You have connector.type, connector.path and format.type listed. It should be 
connector, path and format. 
 
In the end, I used the following code and the expect result was properly 
written:

        tEnv.executeSql(
                "CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) 
WITH ('connector' = 'filesystem', 'path' = 'file:///C:/temp/test4.txt', 
'format' = 'csv', 'csv.field-delimi

Re: Why this example does not save anything to file?

2022-08-31 Thread David Anderson
With the parallelism set to 2, you will get 2 files. More than 2, actually
-- e.g., with hourly buckets, you'll get 2 files per hour.

If the i/o bandwidth of a single instance is sufficient to handle your
expected throughput, then you can set the parallelism of the sink (or the
entire pipeline) to one. If it's not, then you'll be glad to have multiple
instances each handling a slice of the job and writing to independent
filesystems.

David

On Wed, Aug 31, 2022 at 8:44 AM  wrote:

>
> Doesn't it depends on 'sink.parallelism'?
> If I set 'sink.parallelism' = '2' I get two files, 'sink.parallelism' =
> '1' just one file...
>
> But I think doing like that I reduce the number of tasks so it will have
> negative impact on performance :-(
>
>
>
> Sent: Tuesday, August 30, 2022 at 3:22 PM
> From: "Martijn Visser" 
> To: pod...@gmx.com, user@flink.apache.org
> Subject: Re: Why this example does not save anything to file?
>
> Hi Mike,
>
> I think that's caused by you not having enabled checkpointing. If you
> enable that, it should be resolved I think.
>
> Best regards,
>
> Martijn
>
> On Wed, Aug 3, 2022 at 9:01 PM mailto:pod...@gmx.com]>
> wrote:
> Thank you very much Martijn you dedicated your productive time to help me!
> I'm new noob in this subject - I took that example somewhere from
> Internet. I see problem for guys like me is that Flink syntax changes from
> version to version quite significantly. So here not 'connector.type' but
> 'connector' etc.
>
> Additional problem was that there was no error that something is wrong and
> in addition 'select from' in next lines display result from table...
>
> Anyway, I was expecting single file 'test5.txt' as a result but got file
> for each row.
>
> part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-0-file-0
> part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-1-file-0
> ...
>
>
> Can it be just one file?
> Best,
>
> Mike
>
>
> Sent: Wednesday, August 03, 2022 at 4:03 PM
> From: "Martijn Visser"  martijnvis...@apache.org]>
> To: pod...@gmx.com[mailto:pod...@gmx.com]
> Cc: user@flink.apache.org[mailto:user@flink.apache.org]
> Subject: Re: Why this example does not save anything to file?
>
> I've verified your code locally and it doesn't work indeed, at least not
> with the latest Flink version (I've tested it with Flink 1.15). There are a
> couple of reasons for that:
>
> 1. You've mentioned in this thread that there's no problem with the
> 'csv.field-delimiter'. There is actually, because the default is a , and
> not a ; as documented at
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options]
> 2. When adding this option, Flink wouldn't compile because the SQL
> statement uses options that are different then documented at
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/][https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5D]
> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5D%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5D%5D>.
> You have connector.type, connector.path and format.type listed. It should
> be connector, path and format.
>
> In the end, I used the following code and the expect result was properly
> written:
>
> tEnv.executeSql(
> "CREATE TABLE Table1 (column_name1 STRING, column_name2
> DOUBLE) WITH ('connector' = 'filesystem', 'path' =
> 'file:///C:/temp/test4.txt', 'format' = 'csv', 'csv.field-delimiter' =
> ';')");
>
> Best regards,
>
> Martijn
>
> Op di 2 aug. 2022 om 00:42 schreef mailto:pod...@gmx.com
> ][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>:
>
> No, I do not have it
>
>
>
> Sent: Monday, August 01, 2022 at 4:43 PM
> From: "Martijn Visser"  martijnvis...@apache.org][mailto:martijnvis...@apache.org[mailto:
> martijnvis...@apache.org]]>
> To: pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@g

Re: Why this example does not save anything to file?

2022-08-31 Thread podunk


Doesn't it depends on 'sink.parallelism'?
If I set 'sink.parallelism' = '2' I get two files, 'sink.parallelism' = '1' 
just one file...
 
But I think doing like that I reduce the number of tasks so it will have 
negative impact on performance :-(
 
 

Sent: Tuesday, August 30, 2022 at 3:22 PM
From: "Martijn Visser" 
To: pod...@gmx.com, user@flink.apache.org
Subject: Re: Why this example does not save anything to file?

Hi Mike,
 
I think that's caused by you not having enabled checkpointing. If you enable 
that, it should be resolved I think.
 
Best regards,
 
Martijn 

On Wed, Aug 3, 2022 at 9:01 PM mailto:pod...@gmx.com]> wrote:
Thank you very much Martijn you dedicated your productive time to help me!
I'm new noob in this subject - I took that example somewhere from Internet. I 
see problem for guys like me is that Flink syntax changes from version to 
version quite significantly. So here not 'connector.type' but 'connector' etc.
 
Additional problem was that there was no error that something is wrong and in 
addition 'select from' in next lines display result from table...
 
Anyway, I was expecting single file 'test5.txt' as a result but got file for 
each row.
 
part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-0-file-0
part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-1-file-0
...
 

Can it be just one file?
Best,
 
Mike
 

Sent: Wednesday, August 03, 2022 at 4:03 PM
From: "Martijn Visser" 
mailto:martijnvis...@apache.org]>
To: pod...@gmx.com[mailto:pod...@gmx.com]
Cc: user@flink.apache.org[mailto:user@flink.apache.org]
Subject: Re: Why this example does not save anything to file?

I've verified your code locally and it doesn't work indeed, at least not with 
the latest Flink version (I've tested it with Flink 1.15). There are a couple 
of reasons for that:
 
1. You've mentioned in this thread that there's no problem with the 
'csv.field-delimiter'. There is actually, because the default is a , and not a 
; as documented at 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options]
2. When adding this option, Flink wouldn't compile because the SQL statement 
uses options that are different then documented at 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/][https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5D].
 You have connector.type, connector.path and format.type listed. It should be 
connector, path and format. 
 
In the end, I used the following code and the expect result was properly 
written:

        tEnv.executeSql(
                "CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) 
WITH ('connector' = 'filesystem', 'path' = 'file:///C:/temp/test4.txt', 
'format' = 'csv', 'csv.field-delimiter' = ';')");
 
Best regards,
 
Martijn 

Op di 2 aug. 2022 om 00:42 schreef 
mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>:

No, I do not have it
 
 

Sent: Monday, August 01, 2022 at 4:43 PM
From: "Martijn Visser" 
mailto:martijnvis...@apache.org][mailto:martijnvis...@apache.org[mailto:martijnvis...@apache.org]]>
To: 
pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]
Cc: 
user@flink.apache.org[mailto:user@flink.apache.org][mailto:user@flink.apache.org[mailto:user@flink.apache.org]]
Subject: Re: Why this example does not save anything to file?

That's Flink fault-tolerance mechanism, see 
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/[https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/][https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5Bhttps://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5D]
 

Op ma 1 aug. 2022 om 16:37 schreef 
mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>:

What's that?
 
 

Sent: Monday, August 01, 2022 at 2:49 PM
From: "Martijn Visser" 
mailto:martijnvis...@apache.org][mailto:martijnvis...@apache.org[mailto:martijnvis...@apache.org]]>
To: 
pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]
Cc: 
user@flink.apache.org[mailto:user@flink.apache.org][mailto:user@flink.apache.org[mailto:user@flink.apache.org]]
Subject: Re: Why this example does not save anything

Re: Why this example does not save anything to file?

2022-08-30 Thread Martijn Visser via user
Hi Mike,

I think that's caused by you not having enabled checkpointing. If you
enable that, it should be resolved I think.

Best regards,

Martijn

On Wed, Aug 3, 2022 at 9:01 PM  wrote:

>
> Thank you very much Martijn you dedicated your productive time to help me!
> I'm new noob in this subject - I took that example somewhere from
> Internet. I see problem for guys like me is that Flink syntax changes from
> version to version quite significantly. So here not 'connector.type' but
> 'connector' etc.
>
> Additional problem was that there was no error that something is wrong and
> in addition 'select from' in next lines display result from table...
>
> Anyway, I was expecting single file 'test5.txt' as a result but got file
> for each row.
>
> part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-0-file-0
> part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-1-file-0
> ...
>
>
> Can it be just one file?
> Best,
>
> Mike
>
>
> Sent: Wednesday, August 03, 2022 at 4:03 PM
> From: "Martijn Visser" 
> To: pod...@gmx.com
> Cc: user@flink.apache.org
> Subject: Re: Why this example does not save anything to file?
>
> I've verified your code locally and it doesn't work indeed, at least not
> with the latest Flink version (I've tested it with Flink 1.15). There are a
> couple of reasons for that:
>
> 1. You've mentioned in this thread that there's no problem with the
> 'csv.field-delimiter'. There is actually, because the default is a , and
> not a ; as documented at
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options
> 2. When adding this option, Flink wouldn't compile because the SQL
> statement uses options that are different then documented at
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/].
> You have connector.type, connector.path and format.type listed. It should
> be connector, path and format.
>
> In the end, I used the following code and the expect result was properly
> written:
>
> tEnv.executeSql(
> "CREATE TABLE Table1 (column_name1 STRING, column_name2
> DOUBLE) WITH ('connector' = 'filesystem', 'path' =
> 'file:///C:/temp/test4.txt', 'format' = 'csv', 'csv.field-delimiter' =
> ';')");
>
> Best regards,
>
> Martijn
>
> Op di 2 aug. 2022 om 00:42 schreef mailto:pod...@gmx.com
> ]>:
>
> No, I do not have it
>
>
>
> Sent: Monday, August 01, 2022 at 4:43 PM
> From: "Martijn Visser"  martijnvis...@apache.org]>
> To: pod...@gmx.com[mailto:pod...@gmx.com]
> Cc: user@flink.apache.org[mailto:user@flink.apache.org]
> Subject: Re: Why this example does not save anything to file?
>
> That's Flink fault-tolerance mechanism, see
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/[https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/]
>
>
> Op ma 1 aug. 2022 om 16:37 schreef mailto:pod...@gmx.com
> ]>:
>
> What's that?
>
>
>
> Sent: Monday, August 01, 2022 at 2:49 PM
> From: "Martijn Visser"  martijnvis...@apache.org]>
> To: pod...@gmx.com[mailto:pod...@gmx.com]
> Cc: user@flink.apache.org[mailto:user@flink.apache.org]
> Subject: Re: Why this example does not save anything to file?
>
> Do you have checkpointing enabled?
>
>
> Op za 30 jul. 2022 om 17:31 schreef mailto:pod...@gmx.com
> ]>:
>
> Thanks David but there's no problem with that (probably ";" is default
> separator).
> I can read the file and insert into "Table1" (I said that in my mail).
> Problem is to save to CSV.
>
>
>
> Sent: Saturday, July 30, 2022 at 3:33 PM
> From: "David Anderson" mailto:dander...@apache.org]>
> To: pod...@gmx.com[mailto:pod...@gmx.com]
> Cc: "user" mailto:user@flink.apache.org]>
> Subject: Re: Why this example does not save anything to file?
>
> You need to add
>
> 'csv.field-delimiter'=';'
>
> to the definition of Table1 so that the input from test4.txt can be
> correctly parsed:
>  tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING,
> column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' =
> 'csv', &

Re: Why this example does not save anything to file?

2022-08-03 Thread podunk


Thank you very much Martijn you dedicated your productive time to help me!
I'm new noob in this subject - I took that example somewhere from Internet. I 
see problem for guys like me is that Flink syntax changes from version to 
version quite significantly. So here not 'connector.type' but 'connector' etc.
 
Additional problem was that there was no error that something is wrong and in 
addition 'select from' in next lines display result from table...
 
Anyway, I was expecting single file 'test5.txt' as a result but got file for 
each row.
 
part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-0-file-0
part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-1-file-0
...
 

Can it be just one file?
Best,
 
Mike
 

Sent: Wednesday, August 03, 2022 at 4:03 PM
From: "Martijn Visser" 
To: pod...@gmx.com
Cc: user@flink.apache.org
Subject: Re: Why this example does not save anything to file?

I've verified your code locally and it doesn't work indeed, at least not with 
the latest Flink version (I've tested it with Flink 1.15). There are a couple 
of reasons for that:
 
1. You've mentioned in this thread that there's no problem with the 
'csv.field-delimiter'. There is actually, because the default is a , and not a 
; as documented at 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options
2. When adding this option, Flink wouldn't compile because the SQL statement 
uses options that are different then documented at 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/].
 You have connector.type, connector.path and format.type listed. It should be 
connector, path and format. 
 
In the end, I used the following code and the expect result was properly 
written:

        tEnv.executeSql(
                "CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) 
WITH ('connector' = 'filesystem', 'path' = 'file:///C:/temp/test4.txt', 
'format' = 'csv', 'csv.field-delimiter' = ';')");
 
Best regards,
 
Martijn 

Op di 2 aug. 2022 om 00:42 schreef mailto:pod...@gmx.com]>:

No, I do not have it
 
 

Sent: Monday, August 01, 2022 at 4:43 PM
From: "Martijn Visser" 
mailto:martijnvis...@apache.org]>
To: pod...@gmx.com[mailto:pod...@gmx.com]
Cc: user@flink.apache.org[mailto:user@flink.apache.org]
Subject: Re: Why this example does not save anything to file?

That's Flink fault-tolerance mechanism, see 
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/[https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/]
 

Op ma 1 aug. 2022 om 16:37 schreef mailto:pod...@gmx.com]>:

What's that?
 
 

Sent: Monday, August 01, 2022 at 2:49 PM
From: "Martijn Visser" 
mailto:martijnvis...@apache.org]>
To: pod...@gmx.com[mailto:pod...@gmx.com]
Cc: user@flink.apache.org[mailto:user@flink.apache.org]
Subject: Re: Why this example does not save anything to file?

Do you have checkpointing enabled? 
 

Op za 30 jul. 2022 om 17:31 schreef mailto:pod...@gmx.com]>:

Thanks David but there's no problem with that (probably ";" is default 
separator).
I can read the file and insert into "Table1" (I said that in my mail).
Problem is to save to CSV.
 
 

Sent: Saturday, July 30, 2022 at 3:33 PM
From: "David Anderson" mailto:dander...@apache.org]>
To: pod...@gmx.com[mailto:pod...@gmx.com]
Cc: "user" mailto:user@flink.apache.org]>
Subject: Re: Why this example does not save anything to file?

You need to add
 
'csv.field-delimiter'=';'
 
to the definition of Table1 so that the input from test4.txt can be correctly 
parsed:
         tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, 
column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 
'file:///C:/temp/test4.txt', 'format.type' = 'csv', 
'csv.field-delimiter'=';')");
 
Cheers,
David 

On Fri, Jul 29, 2022 at 4:15 PM mailto:pod...@gmx.com]> wrote:

Hi,
 
you mean adding:
 
" 'csv.field-delimiter'=';', "
 
like:
 
    tEnv.executeSql("CREATE TABLE fs_table ("
                + "    column_nameA STRING, "
                + "    column_nameB DOUBLE "
                + "    ) WITH ( "
                + "    'connector'='filesystem', "
                + "    'path'='file:///C:/temp/test5.txt', "
                + "    'format'='csv', "
                + " '

Re: Why this example does not save anything to file?

2022-08-03 Thread Martijn Visser
I've verified your code locally and it doesn't work indeed, at least not
with the latest Flink version (I've tested it with Flink 1.15). There are a
couple of reasons for that:

1. You've mentioned in this thread that there's no problem with the
'csv.field-delimiter'. There is actually, because the default is a , and
not a ; as documented at
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options
2. When adding this option, Flink wouldn't compile because the SQL
statement uses options that are different then documented at
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/.
You have connector.type, connector.path and format.type listed. It should
be connector, path and format.

In the end, I used the following code and the expect result was properly
written:

tEnv.executeSql(
"CREATE TABLE Table1 (column_name1 STRING, column_name2
DOUBLE) WITH ('connector' = 'filesystem', 'path' =
'file:///C:/temp/test4.txt', 'format' = 'csv', 'csv.field-delimiter' =
';')");

Best regards,

Martijn

Op di 2 aug. 2022 om 00:42 schreef :

> No, I do not have it
>
>
> *Sent:* Monday, August 01, 2022 at 4:43 PM
> *From:* "Martijn Visser" 
> *To:* pod...@gmx.com
> *Cc:* user@flink.apache.org
> *Subject:* Re: Why this example does not save anything to file?
> That's Flink fault-tolerance mechanism, see
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/
>
> Op ma 1 aug. 2022 om 16:37 schreef :
>
>> What's that?
>>
>>
>> *Sent:* Monday, August 01, 2022 at 2:49 PM
>> *From:* "Martijn Visser" 
>> *To:* pod...@gmx.com
>> *Cc:* user@flink.apache.org
>> *Subject:* Re: Why this example does not save anything to file?
>> Do you have checkpointing enabled?
>>
>> Op za 30 jul. 2022 om 17:31 schreef :
>>
>>> Thanks David but there's no problem with that (probably ";" is default
>>> separator).
>>> I can read the file and insert into "Table1" (I said that in my mail).
>>> Problem is to save to CSV.
>>>
>>>
>>> *Sent:* Saturday, July 30, 2022 at 3:33 PM
>>> *From:* "David Anderson" 
>>> *To:* pod...@gmx.com
>>> *Cc:* "user" 
>>> *Subject:* Re: Why this example does not save anything to file?
>>> You need to add
>>>
>>> 'csv.field-delimiter'=';'
>>>
>>> to the definition of Table1 so that the input from test4.txt can be
>>> correctly parsed:
>>>
>>> tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING,
>>> column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
>>> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv',
>>> 'csv.field-delimiter'=';')");
>>>
>>> Cheers,
>>> David
>>>
>>> On Fri, Jul 29, 2022 at 4:15 PM  wrote:
>>>
>>>> Hi,
>>>>
>>>> you mean adding:
>>>>
>>>> " 'csv.field-delimiter'=';', "
>>>>
>>>> like:
>>>>
>>>> tEnv.executeSql("CREATE TABLE fs_table ("
>>>> + "    column_nameA STRING, "
>>>> + "column_nameB DOUBLE "
>>>> + ") WITH ( "
>>>> + "'connector'='filesystem', "
>>>> + "'path'='file:///C:/temp/test5.txt', "
>>>> + "'format'='csv', "
>>>> + " 'csv.field-delimiter'=';', "
>>>> + " 'sink.partition-commit.delay'='1 s', "
>>>> + " 'sink.partition-commit.policy.kind'='success-file'"
>>>> + ")");
>>>>
>>>> tEnv.executeSql("INSERT INTO fs_table SELECT column_name1,
>>>> column_name2 from Table1");
>>>>
>>>> I did. Nothing new - still does not work.
>>>>
>>>>
>>>>
>>>> *Sent:* Tuesday, July 26, 2022 at 4:00 PM
>>>> *From:* "Gil De Grove" 
>>>> *T

Re: Why this example does not save anything to file?

2022-08-01 Thread podunk
No, I do not have it

 
 

Sent: Monday, August 01, 2022 at 4:43 PM
From: "Martijn Visser" 
To: pod...@gmx.com
Cc: user@flink.apache.org
Subject: Re: Why this example does not save anything to file?


That's Flink fault-tolerance mechanism, see https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/
 


Op ma 1 aug. 2022 om 16:37 schreef <pod...@gmx.com>:




What's that?

 
 

Sent: Monday, August 01, 2022 at 2:49 PM
From: "Martijn Visser" <martijnvis...@apache.org>
To: pod...@gmx.com
Cc: user@flink.apache.org
Subject: Re: Why this example does not save anything to file?



Do you have checkpointing enabled? 

 


Op za 30 jul. 2022 om 17:31 schreef <pod...@gmx.com>:




Thanks David but there's no problem with that (probably ";" is default separator).

I can read the file and insert into "Table1" (I said that in my mail).

Problem is to save to CSV.

 
 

Sent: Saturday, July 30, 2022 at 3:33 PM
From: "David Anderson" <dander...@apache.org>
To: pod...@gmx.com
Cc: "user" <user@flink.apache.org>
Subject: Re: Why this example does not save anything to file?


You need to add
 

'csv.field-delimiter'=';'

 

to the definition of Table1 so that the input from test4.txt can be correctly parsed:

 
        tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv', 'csv.field-delimiter'=';')");

 

Cheers,

David

 


On Fri, Jul 29, 2022 at 4:15 PM <pod...@gmx.com> wrote:




Hi,

 

you mean adding:

 

" 'csv.field-delimiter'=';', "

 

like:

 

    tEnv.executeSql("CREATE TABLE fs_table ("
                + "    column_nameA STRING, "
                + "    column_nameB DOUBLE "
                + "    ) WITH ( "
                + "    'connector'='filesystem', "
                + "    'path'='file:///C:/temp/test5.txt', "
                + "    'format'='csv', "
                + " 'csv.field-delimiter'=';', "
                + " 'sink.partition-commit.delay'='1 s', "
                + " 'sink.partition-commit.policy.kind'='success-file'"
                + "    )");
        
    tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1");

 

I did. Nothing new - still does not work.

 

 
 

Sent: Tuesday, July 26, 2022 at 4:00 PM
From: "Gil De Grove" <gil.degr...@euranova.eu>
To: "Weihua Hu" <huweihua@gmail.com>
Cc: pod...@gmx.com, "user" <user@flink.apache.org>
Subject: Re: Why this example does not save anything to file?



Hello,
 

I may be really wrong with this, but from what I get in the source file, you are using a semi-column to separate the value.

This probably means that you should set the csv.field-delimiter to `;` to make your example work properly.
 

Have you tried with that configuration in your create table csv connector option?

 

Regards,

Gil

 


On Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua@gmail.com> wrote:


Hi,

 

Can you see any exception logs? 

Where is this code running? is it a standalone cluster with one TaskManager?

 

 


Best,
Weihua





 


On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com> wrote:




If I get it correctly this is the way how I can save to CSV:

https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example

 

So my code is (read from file, save to file):

 

 


package flinkCSV;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class flinkCSV {

    public static void main(String[] args) throws Exception {

        
        //register and create table
        EnvironmentSettings settings = EnvironmentSettings
            .newInstance()
            //.inStreamingMode()
            .inBatchMode()
            .build();

        final TableEnvironment tEnv = TableEnvironment.create(settings);

        
        tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')");
        
    tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM Table1")
    .execute()
    .print();
        

    tEnv.executeSql("CREATE TABLE fs_table ("
   

Re: Why this example does not save anything to file?

2022-08-01 Thread Martijn Visser
That's Flink fault-tolerance mechanism, see
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/

Op ma 1 aug. 2022 om 16:37 schreef :

> What's that?
>
>
> *Sent:* Monday, August 01, 2022 at 2:49 PM
> *From:* "Martijn Visser" 
> *To:* pod...@gmx.com
> *Cc:* user@flink.apache.org
> *Subject:* Re: Why this example does not save anything to file?
> Do you have checkpointing enabled?
>
> Op za 30 jul. 2022 om 17:31 schreef :
>
>> Thanks David but there's no problem with that (probably ";" is default
>> separator).
>> I can read the file and insert into "Table1" (I said that in my mail).
>> Problem is to save to CSV.
>>
>>
>> *Sent:* Saturday, July 30, 2022 at 3:33 PM
>> *From:* "David Anderson" 
>> *To:* pod...@gmx.com
>> *Cc:* "user" 
>> *Subject:* Re: Why this example does not save anything to file?
>> You need to add
>>
>> 'csv.field-delimiter'=';'
>>
>> to the definition of Table1 so that the input from test4.txt can be
>> correctly parsed:
>>
>> tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING,
>> column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
>> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv',
>> 'csv.field-delimiter'=';')");
>>
>> Cheers,
>> David
>>
>> On Fri, Jul 29, 2022 at 4:15 PM  wrote:
>>
>>> Hi,
>>>
>>> you mean adding:
>>>
>>> " 'csv.field-delimiter'=';', "
>>>
>>> like:
>>>
>>> tEnv.executeSql("CREATE TABLE fs_table ("
>>> + "column_nameA STRING, "
>>> + "column_nameB DOUBLE "
>>> + ") WITH ( "
>>> + "'connector'='filesystem', "
>>> + "'path'='file:///C:/temp/test5.txt', "
>>> + "'format'='csv', "
>>> + " 'csv.field-delimiter'=';', "
>>> + " 'sink.partition-commit.delay'='1 s', "
>>> + " 'sink.partition-commit.policy.kind'='success-file'"
>>> + ")");
>>>
>>> tEnv.executeSql("INSERT INTO fs_table SELECT column_name1,
>>> column_name2 from Table1");
>>>
>>> I did. Nothing new - still does not work.
>>>
>>>
>>>
>>> *Sent:* Tuesday, July 26, 2022 at 4:00 PM
>>> *From:* "Gil De Grove" 
>>> *To:* "Weihua Hu" 
>>> *Cc:* pod...@gmx.com, "user" 
>>> *Subject:* Re: Why this example does not save anything to file?
>>> Hello,
>>>
>>> I may be really wrong with this, but from what I get in the source file,
>>> you are using a semi-column to separate the value.
>>> This probably means that you should set the csv.field-delimiter to `;`
>>> to make your example work properly.
>>>
>>> Have you tried with that configuration in your create table csv
>>> connector option?
>>>
>>> Regards,
>>> Gil
>>>
>>> On Tue, 26 Jul 2022 at 15:40, Weihua Hu  wrote:
>>>
>>>> Hi,
>>>>
>>>> Can you see any exception logs?
>>>> Where is this code running? is it a standalone cluster with one
>>>> TaskManager?
>>>>
>>>>
>>>> Best,
>>>> Weihua
>>>>
>>>> On Tue, Jul 26, 2022 at 4:18 AM  wrote:
>>>>
>>>>> If I get it correctly this is the way how I can save to CSV:
>>>>>
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example
>>>>>
>>>>> So my code is (read from file, save to file):
>>>>>
>>>>>
>>>>> *package flinkCSV;*
>>>>>
>>>>> *import org.apache.flink.table.api.EnvironmentSettings; import
>>>>> org.apache.flink.table.api.TableEnvironment;*
>>>>> *public class flinkCSV {*
>>>>> *public static void main(String[] args) throws Exception {*
>>>>>
&g

Re: Why this example does not save anything to file?

2022-08-01 Thread podunk
What's that?

 
 

Sent: Monday, August 01, 2022 at 2:49 PM
From: "Martijn Visser" 
To: pod...@gmx.com
Cc: user@flink.apache.org
Subject: Re: Why this example does not save anything to file?



Do you have checkpointing enabled? 

 


Op za 30 jul. 2022 om 17:31 schreef <pod...@gmx.com>:




Thanks David but there's no problem with that (probably ";" is default separator).

I can read the file and insert into "Table1" (I said that in my mail).

Problem is to save to CSV.

 
 

Sent: Saturday, July 30, 2022 at 3:33 PM
From: "David Anderson" <dander...@apache.org>
To: pod...@gmx.com
Cc: "user" <user@flink.apache.org>
Subject: Re: Why this example does not save anything to file?


You need to add
 

'csv.field-delimiter'=';'

 

to the definition of Table1 so that the input from test4.txt can be correctly parsed:

 
        tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv', 'csv.field-delimiter'=';')");

 

Cheers,

David

 


On Fri, Jul 29, 2022 at 4:15 PM <pod...@gmx.com> wrote:




Hi,

 

you mean adding:

 

" 'csv.field-delimiter'=';', "

 

like:

 

    tEnv.executeSql("CREATE TABLE fs_table ("
                + "    column_nameA STRING, "
                + "    column_nameB DOUBLE "
                + "    ) WITH ( "
                + "    'connector'='filesystem', "
                + "    'path'='file:///C:/temp/test5.txt', "
                + "    'format'='csv', "
                + " 'csv.field-delimiter'=';', "
                + " 'sink.partition-commit.delay'='1 s', "
                + " 'sink.partition-commit.policy.kind'='success-file'"
                + "    )");
        
    tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1");

 

I did. Nothing new - still does not work.

 

 
 

Sent: Tuesday, July 26, 2022 at 4:00 PM
From: "Gil De Grove" <gil.degr...@euranova.eu>
To: "Weihua Hu" <huweihua@gmail.com>
Cc: pod...@gmx.com, "user" <user@flink.apache.org>
Subject: Re: Why this example does not save anything to file?



Hello,
 

I may be really wrong with this, but from what I get in the source file, you are using a semi-column to separate the value.

This probably means that you should set the csv.field-delimiter to `;` to make your example work properly.
 

Have you tried with that configuration in your create table csv connector option?

 

Regards,

Gil

 


On Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua@gmail.com> wrote:


Hi,

 

Can you see any exception logs? 

Where is this code running? is it a standalone cluster with one TaskManager?

 

 


Best,
Weihua





 


On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com> wrote:




If I get it correctly this is the way how I can save to CSV:

https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example

 

So my code is (read from file, save to file):

 

 


package flinkCSV;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class flinkCSV {

    public static void main(String[] args) throws Exception {

        
        //register and create table
        EnvironmentSettings settings = EnvironmentSettings
            .newInstance()
            //.inStreamingMode()
            .inBatchMode()
            .build();

        final TableEnvironment tEnv = TableEnvironment.create(settings);

        
        tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')");
        
    tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM Table1")
    .execute()
    .print();
        

    tEnv.executeSql("CREATE TABLE fs_table ("
                + "    column_nameA STRING, "
                + "    column_nameB DOUBLE "
                + "    ) WITH ( \n"
                + "    'connector'='filesystem', "
                + "    'path'='file:///C:/temp/test5.txt', "
                + "    'format'='csv', "
                + "  'sink.partition-commit.delay'='1 s', "
                + "  'sink.partition-commit.policy.kind'='success-file'"
                + "    )");
        
    tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1");
        
    tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table")
    .execute()
    .print();
        
     }
}

 

Source file (test4.txt) is:

 

aa; 23
bb; 657.9
cc; 55

 

test5.txt is not created, select from fs_table gives null

 






























Re: Why this example does not save anything to file?

2022-08-01 Thread Martijn Visser
Do you have checkpointing enabled?

Op za 30 jul. 2022 om 17:31 schreef :

> Thanks David but there's no problem with that (probably ";" is default
> separator).
> I can read the file and insert into "Table1" (I said that in my mail).
> Problem is to save to CSV.
>
>
> *Sent:* Saturday, July 30, 2022 at 3:33 PM
> *From:* "David Anderson" 
> *To:* pod...@gmx.com
> *Cc:* "user" 
> *Subject:* Re: Why this example does not save anything to file?
> You need to add
>
> 'csv.field-delimiter'=';'
>
> to the definition of Table1 so that the input from test4.txt can be
> correctly parsed:
>
> tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING,
> column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv',
> 'csv.field-delimiter'=';')");
>
> Cheers,
> David
>
> On Fri, Jul 29, 2022 at 4:15 PM  wrote:
>
>> Hi,
>>
>> you mean adding:
>>
>> " 'csv.field-delimiter'=';', "
>>
>> like:
>>
>> tEnv.executeSql("CREATE TABLE fs_table ("
>> + "column_nameA STRING, "
>> + "column_nameB DOUBLE "
>> + ") WITH ( "
>> + "'connector'='filesystem', "
>> + "'path'='file:///C:/temp/test5.txt', "
>> + "'format'='csv', "
>> + " 'csv.field-delimiter'=';', "
>> + " 'sink.partition-commit.delay'='1 s', "
>> + " 'sink.partition-commit.policy.kind'='success-file'"
>> + ")");
>>
>> tEnv.executeSql("INSERT INTO fs_table SELECT column_name1,
>> column_name2 from Table1");
>>
>> I did. Nothing new - still does not work.
>>
>>
>>
>> *Sent:* Tuesday, July 26, 2022 at 4:00 PM
>> *From:* "Gil De Grove" 
>> *To:* "Weihua Hu" 
>> *Cc:* pod...@gmx.com, "user" 
>> *Subject:* Re: Why this example does not save anything to file?
>> Hello,
>>
>> I may be really wrong with this, but from what I get in the source file,
>> you are using a semi-column to separate the value.
>> This probably means that you should set the csv.field-delimiter to `;` to
>> make your example work properly.
>>
>> Have you tried with that configuration in your create table csv connector
>> option?
>>
>> Regards,
>> Gil
>>
>> On Tue, 26 Jul 2022 at 15:40, Weihua Hu  wrote:
>>
>>> Hi,
>>>
>>> Can you see any exception logs?
>>> Where is this code running? is it a standalone cluster with one
>>> TaskManager?
>>>
>>>
>>> Best,
>>> Weihua
>>>
>>> On Tue, Jul 26, 2022 at 4:18 AM  wrote:
>>>
>>>> If I get it correctly this is the way how I can save to CSV:
>>>>
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example
>>>>
>>>> So my code is (read from file, save to file):
>>>>
>>>>
>>>> *package flinkCSV;*
>>>>
>>>> *import org.apache.flink.table.api.EnvironmentSettings; import
>>>> org.apache.flink.table.api.TableEnvironment;*
>>>> *public class flinkCSV {*
>>>> *public static void main(String[] args) throws Exception {*
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> * //register and create table
>>>>  EnvironmentSettings settings = EnvironmentSettings
>>>> .newInstance() //.inStreamingMode()
>>>> .inBatchMode() .build();*
>>>> *final TableEnvironment tEnv =
>>>> TableEnvironment.create(settings);*
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> * tEnv.executeSql("CREATE TABLE Table1 (column_name1
>>>> STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
>>>> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')");
>>>>  tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM
>>>> Table1") .execute() .print(); *
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *tEnv.executeSql("CREATE TABLE fs_table (" +
>>>> "column_nameA STRING, " + "column_nameB DOUBLE "
>>>>  + ") WITH ( \n" + "
>>>>  'connector'='filesystem', " + "
>>>>  'path'='file:///C:/temp/test5.txt', " + "
>>>>  'format'='csv', " + "  'sink.partition-commit.delay'='1
>>>> s', " + "
>>>> 'sink.partition-commit.policy.kind'='success-file'" + "
>>>>  )");  tEnv.executeSql("INSERT INTO fs_table SELECT
>>>> column_name1, column_name2 from Table1");
>>>> tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table")
>>>> .execute() .print();   } }*
>>>>
>>>> Source file (test4.txt) is:
>>>>
>>>> aa; 23
>>>> bb; 657.9
>>>> cc; 55
>>>>
>>>> test5.txt is not created, select from fs_table gives null
>>>>
>>>>
>>>


Re: Why this example does not save anything to file?

2022-07-30 Thread podunk
Thanks David but there's no problem with that (probably ";" is default separator).

I can read the file and insert into "Table1" (I said that in my mail).

Problem is to save to CSV.

 
 

Sent: Saturday, July 30, 2022 at 3:33 PM
From: "David Anderson" 
To: pod...@gmx.com
Cc: "user" 
Subject: Re: Why this example does not save anything to file?


You need to add
 

'csv.field-delimiter'=';'

 

to the definition of Table1 so that the input from test4.txt can be correctly parsed:

 
        tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv', 'csv.field-delimiter'=';')");

 

Cheers,

David

 


On Fri, Jul 29, 2022 at 4:15 PM <pod...@gmx.com> wrote:




Hi,

 

you mean adding:

 

" 'csv.field-delimiter'=';', "

 

like:

 

    tEnv.executeSql("CREATE TABLE fs_table ("
                + "    column_nameA STRING, "
                + "    column_nameB DOUBLE "
                + "    ) WITH ( "
                + "    'connector'='filesystem', "
                + "    'path'='file:///C:/temp/test5.txt', "
                + "    'format'='csv', "
                + " 'csv.field-delimiter'=';', "
                + " 'sink.partition-commit.delay'='1 s', "
                + " 'sink.partition-commit.policy.kind'='success-file'"
                + "    )");
        
    tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1");

 

I did. Nothing new - still does not work.

 

 
 

Sent: Tuesday, July 26, 2022 at 4:00 PM
From: "Gil De Grove" <gil.degr...@euranova.eu>
To: "Weihua Hu" <huweihua@gmail.com>
Cc: pod...@gmx.com, "user" <user@flink.apache.org>
Subject: Re: Why this example does not save anything to file?



Hello,
 

I may be really wrong with this, but from what I get in the source file, you are using a semi-column to separate the value.

This probably means that you should set the csv.field-delimiter to `;` to make your example work properly.
 

Have you tried with that configuration in your create table csv connector option?

 

Regards,

Gil

 


On Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua@gmail.com> wrote:


Hi,

 

Can you see any exception logs? 

Where is this code running? is it a standalone cluster with one TaskManager?

 

 


Best,
Weihua





 


On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com> wrote:




If I get it correctly this is the way how I can save to CSV:

https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example

 

So my code is (read from file, save to file):

 

 


package flinkCSV;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class flinkCSV {

    public static void main(String[] args) throws Exception {

        
        //register and create table
        EnvironmentSettings settings = EnvironmentSettings
            .newInstance()
            //.inStreamingMode()
            .inBatchMode()
            .build();

        final TableEnvironment tEnv = TableEnvironment.create(settings);

        
        tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')");
        
    tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM Table1")
    .execute()
    .print();
        

    tEnv.executeSql("CREATE TABLE fs_table ("
                + "    column_nameA STRING, "
                + "    column_nameB DOUBLE "
                + "    ) WITH ( \n"
                + "    'connector'='filesystem', "
                + "    'path'='file:///C:/temp/test5.txt', "
                + "    'format'='csv', "
                + "  'sink.partition-commit.delay'='1 s', "
                + "  'sink.partition-commit.policy.kind'='success-file'"
                + "    )");
        
    tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1");
        
    tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table")
    .execute()
    .print();
        
     }
}

 

Source file (test4.txt) is:

 

aa; 23
bb; 657.9
cc; 55

 

test5.txt is not created, select from fs_table gives null

 





















Re: Why this example does not save anything to file?

2022-07-30 Thread David Anderson
You need to add

'csv.field-delimiter'=';'

to the definition of Table1 so that the input from test4.txt can be
correctly parsed:

tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING,
column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv',
'csv.field-delimiter'=';')");

Cheers,
David

On Fri, Jul 29, 2022 at 4:15 PM  wrote:

> Hi,
>
> you mean adding:
>
> " 'csv.field-delimiter'=';', "
>
> like:
>
> tEnv.executeSql("CREATE TABLE fs_table ("
> + "column_nameA STRING, "
> + "column_nameB DOUBLE "
> + ") WITH ( "
> + "'connector'='filesystem', "
> + "'path'='file:///C:/temp/test5.txt', "
> + "'format'='csv', "
> + " 'csv.field-delimiter'=';', "
> + " 'sink.partition-commit.delay'='1 s', "
>     + " 'sink.partition-commit.policy.kind'='success-file'"
> + ")");
>
> tEnv.executeSql("INSERT INTO fs_table SELECT column_name1,
> column_name2 from Table1");
>
> I did. Nothing new - still does not work.
>
>
>
> *Sent:* Tuesday, July 26, 2022 at 4:00 PM
> *From:* "Gil De Grove" 
> *To:* "Weihua Hu" 
> *Cc:* pod...@gmx.com, "user" 
> *Subject:* Re: Why this example does not save anything to file?
> Hello,
>
> I may be really wrong with this, but from what I get in the source file,
> you are using a semi-column to separate the value.
> This probably means that you should set the csv.field-delimiter to `;` to
> make your example work properly.
>
> Have you tried with that configuration in your create table csv connector
> option?
>
> Regards,
> Gil
>
> On Tue, 26 Jul 2022 at 15:40, Weihua Hu  wrote:
>
>> Hi,
>>
>> Can you see any exception logs?
>> Where is this code running? is it a standalone cluster with one
>> TaskManager?
>>
>>
>> Best,
>> Weihua
>>
>> On Tue, Jul 26, 2022 at 4:18 AM  wrote:
>>
>>> If I get it correctly this is the way how I can save to CSV:
>>>
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example
>>>
>>> So my code is (read from file, save to file):
>>>
>>>
>>> *package flinkCSV;*
>>>
>>> *import org.apache.flink.table.api.EnvironmentSettings; import
>>> org.apache.flink.table.api.TableEnvironment;*
>>> *public class flinkCSV {*
>>> *public static void main(String[] args) throws Exception {*
>>>
>>>
>>>
>>>
>>>
>>>
>>> * //register and create table
>>>  EnvironmentSettings settings = EnvironmentSettings
>>> .newInstance() //.inStreamingMode()
>>> .inBatchMode() .build();*
>>> *final TableEnvironment tEnv =
>>> TableEnvironment.create(settings);*
>>>
>>>
>>>
>>>
>>>
>>>
>>> * tEnv.executeSql("CREATE TABLE Table1 (column_name1
>>> STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
>>> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')");
>>>  tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM
>>> Table1") .execute() .print(); *
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *tEnv.executeSql("CREATE TABLE fs_table (" + "
>>>  column_nameA STRING, " + "column_nameB DOUBLE "
>>>  + ") WITH ( \n" + "
>>>  'connector'='filesystem', " + "
>>>  'path'='file:///C:/temp/test5.txt', " + "
>>>  'format'='csv', " + "  'sink.partition-commit.delay'='1
>>> s', " + "
>>> 'sink.partition-commit.policy.kind'='success-file'" + "
>>>  )");  tEnv.executeSql("INSERT INTO fs_table SELECT
>>> column_name1, column_name2 from Table1");
>>> tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table")
>>> .execute() .print();   } }*
>>>
>>> Source file (test4.txt) is:
>>>
>>> aa; 23
>>> bb; 657.9
>>> cc; 55
>>>
>>> test5.txt is not created, select from fs_table gives null
>>>
>>>
>>


Re: Why this example does not save anything to file?

2022-07-29 Thread podunk
Hi,

 

you mean adding:

 

" 'csv.field-delimiter'=';', "

 

like:

 

    tEnv.executeSql("CREATE TABLE fs_table ("
                + "    column_nameA STRING, "
                + "    column_nameB DOUBLE "
                + "    ) WITH ( "
                + "    'connector'='filesystem', "
                + "    'path'='file:///C:/temp/test5.txt', "
                + "    'format'='csv', "
                + " 'csv.field-delimiter'=';', "
                + " 'sink.partition-commit.delay'='1 s', "
                + " 'sink.partition-commit.policy.kind'='success-file'"
                + "    )");
        
    tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1");

 

I did. Nothing new - still does not work.

 

 
 

Sent: Tuesday, July 26, 2022 at 4:00 PM
From: "Gil De Grove" 
To: "Weihua Hu" 
Cc: pod...@gmx.com, "user" 
Subject: Re: Why this example does not save anything to file?



Hello,
 

I may be really wrong with this, but from what I get in the source file, you are using a semi-column to separate the value.

This probably means that you should set the csv.field-delimiter to `;` to make your example work properly.
 

Have you tried with that configuration in your create table csv connector option?

 

Regards,

Gil

 


On Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua@gmail.com> wrote:


Hi,

 

Can you see any exception logs? 

Where is this code running? is it a standalone cluster with one TaskManager?

 

 


Best,
Weihua





 


On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com> wrote:




If I get it correctly this is the way how I can save to CSV:

https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example

 

So my code is (read from file, save to file):

 

 


package flinkCSV;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class flinkCSV {

    public static void main(String[] args) throws Exception {

        
        //register and create table
        EnvironmentSettings settings = EnvironmentSettings
            .newInstance()
            //.inStreamingMode()
            .inBatchMode()
            .build();

        final TableEnvironment tEnv = TableEnvironment.create(settings);

        
        tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')");
        
    tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM Table1")
    .execute()
    .print();
        

    tEnv.executeSql("CREATE TABLE fs_table ("
                + "    column_nameA STRING, "
                + "    column_nameB DOUBLE "
                + "    ) WITH ( \n"
                + "    'connector'='filesystem', "
                + "    'path'='file:///C:/temp/test5.txt', "
                + "    'format'='csv', "
                + "  'sink.partition-commit.delay'='1 s', "
                + "  'sink.partition-commit.policy.kind'='success-file'"
                + "    )");
        
    tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1");
        
    tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table")
    .execute()
    .print();
        
     }
}

 

Source file (test4.txt) is:

 

aa; 23
bb; 657.9
cc; 55

 

test5.txt is not created, select from fs_table gives null

 













Re: Why this example does not save anything to file?

2022-07-29 Thread podunk
In Eclipse only:

 

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/C:/Flink/flink-1.14.4/lib/flink-dist_2.11-1.14.4.jar) to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release

 

In Flink logs:

 

2022-07-29 22:04:15,835 INFO  org.apache.flink.api.java.typeutils.TypeExtractor    [] - class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit does not contain a setter for field modificationTime
2022-07-29 22:04:15,836 INFO  org.apache.flink.api.java.typeutils.TypeExtractor    [] - Class class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2022-07-29 22:04:15,949 INFO  org.apache.flink.client.program.rest.RestClusterClient   [] - Submitting job 'insert-into_default_catalog.default_database.fs_table' (064e2e154028411712786c61a79221bf).
 

I do not think anything is wrong with these fields - they are correctly declared.

 


I see similar issue: https://issues.apache.org/jira/browse/FLINK-12410

This is a bug?

 

Sent: Tuesday, July 26, 2022 at 3:40 PM
From: "Weihua Hu" 
To: pod...@gmx.com
Cc: "user" 
Subject: Re: Why this example does not save anything to file?


Hi,

 

Can you see any exception logs? 

Where is this code running? is it a standalone cluster with one TaskManager?

 

 


Best,
Weihua





 


On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com> wrote:




If I get it correctly this is the way how I can save to CSV:

https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example

 

So my code is (read from file, save to file):

 

 


package flinkCSV;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class flinkCSV {

    public static void main(String[] args) throws Exception {

        
        //register and create table
        EnvironmentSettings settings = EnvironmentSettings
            .newInstance()
            //.inStreamingMode()
            .inBatchMode()
            .build();

        final TableEnvironment tEnv = TableEnvironment.create(settings);

        
        tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')");
        
    tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM Table1")
    .execute()
    .print();
        

    tEnv.executeSql("CREATE TABLE fs_table ("
                + "    column_nameA STRING, "
                + "    column_nameB DOUBLE "
                + "    ) WITH ( \n"
                + "    'connector'='filesystem', "
                + "    'path'='file:///C:/temp/test5.txt', "
                + "    'format'='csv', "
                + "  'sink.partition-commit.delay'='1 s', "
                + "  'sink.partition-commit.policy.kind'='success-file'"
                + "    )");
        
    tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1");
        
    tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table")
    .execute()
    .print();
        
     }
}

 

Source file (test4.txt) is:

 

aa; 23
bb; 657.9
cc; 55

 

test5.txt is not created, select from fs_table gives null

 











Re: Why this example does not save anything to file?

2022-07-26 Thread Gil De Grove
Hello,

I may be really wrong with this, but from what I get in the source file,
you are using a semi-column to separate the value.
This probably means that you should set the csv.field-delimiter to `;` to
make your example work properly.

Have you tried with that configuration in your create table csv connector
option?

Regards,
Gil

On Tue, 26 Jul 2022 at 15:40, Weihua Hu  wrote:

> Hi,
>
> Can you see any exception logs?
> Where is this code running? is it a standalone cluster with one
> TaskManager?
>
>
> Best,
> Weihua
>
>
> On Tue, Jul 26, 2022 at 4:18 AM  wrote:
>
>> If I get it correctly this is the way how I can save to CSV:
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example
>>
>> So my code is (read from file, save to file):
>>
>>
>> *package flinkCSV;*
>>
>> *import org.apache.flink.table.api.EnvironmentSettings; import
>> org.apache.flink.table.api.TableEnvironment;*
>> *public class flinkCSV {*
>> *public static void main(String[] args) throws Exception {*
>>
>>
>>
>>
>>
>>
>> * //register and create table EnvironmentSettings
>> settings = EnvironmentSettings .newInstance()
>>  //.inStreamingMode() .inBatchMode()
>> .build();*
>> *final TableEnvironment tEnv = TableEnvironment.create(settings);*
>>
>>
>>
>>
>>
>>
>> * tEnv.executeSql("CREATE TABLE Table1 (column_name1
>> STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
>> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')");
>>  tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM
>> Table1") .execute() .print(); *
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *tEnv.executeSql("CREATE TABLE fs_table (" + "
>>  column_nameA STRING, " + "column_nameB DOUBLE "
>>  + ") WITH ( \n" + "
>>  'connector'='filesystem', " + "
>>  'path'='file:///C:/temp/test5.txt', " + "
>>  'format'='csv', " + "  'sink.partition-commit.delay'='1
>> s', " + "
>> 'sink.partition-commit.policy.kind'='success-file'" + "
>>  )");  tEnv.executeSql("INSERT INTO fs_table SELECT
>> column_name1, column_name2 from Table1");
>> tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table")
>> .execute() .print();   } }*
>>
>> Source file (test4.txt) is:
>>
>> aa; 23
>> bb; 657.9
>> cc; 55
>>
>> test5.txt is not created, select from fs_table gives null
>>
>>
>


Re: Why this example does not save anything to file?

2022-07-26 Thread Weihua Hu
Hi,

Can you see any exception logs?
Where is this code running? is it a standalone cluster with one TaskManager?


Best,
Weihua


On Tue, Jul 26, 2022 at 4:18 AM  wrote:

> If I get it correctly this is the way how I can save to CSV:
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example
>
> So my code is (read from file, save to file):
>
>
> *package flinkCSV;*
>
> *import org.apache.flink.table.api.EnvironmentSettings; import
> org.apache.flink.table.api.TableEnvironment;*
> *public class flinkCSV {*
> *public static void main(String[] args) throws Exception {*
>
>
>
>
>
>
> * //register and create table EnvironmentSettings
> settings = EnvironmentSettings .newInstance()
>  //.inStreamingMode() .inBatchMode()
> .build();*
> *final TableEnvironment tEnv = TableEnvironment.create(settings);*
>
>
>
>
>
>
> * tEnv.executeSql("CREATE TABLE Table1 (column_name1
> STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')");
>  tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM
> Table1") .execute() .print(); *
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *tEnv.executeSql("CREATE TABLE fs_table (" + "
>  column_nameA STRING, " + "column_nameB DOUBLE "
>  + ") WITH ( \n" + "
>  'connector'='filesystem', " + "
>  'path'='file:///C:/temp/test5.txt', " + "
>  'format'='csv', " + "  'sink.partition-commit.delay'='1
> s', " + "
> 'sink.partition-commit.policy.kind'='success-file'" + "
>  )");  tEnv.executeSql("INSERT INTO fs_table SELECT
> column_name1, column_name2 from Table1");
> tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table")
> .execute() .print();   } }*
>
> Source file (test4.txt) is:
>
> aa; 23
> bb; 657.9
> cc; 55
>
> test5.txt is not created, select from fs_table gives null
>
>


Why this example does not save anything to file?

2022-07-25 Thread podunk
If I get it correctly this is the way how I can save to CSV:

https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example

 

So my code is (read from file, save to file):

 

 


package flinkCSV;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class flinkCSV {

    public static void main(String[] args) throws Exception {

        
        //register and create table
        EnvironmentSettings settings = EnvironmentSettings
            .newInstance()
            //.inStreamingMode()
            .inBatchMode()
            .build();

        final TableEnvironment tEnv = TableEnvironment.create(settings);

        
        tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')");
        
    tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM Table1")
    .execute()
    .print();
        

    tEnv.executeSql("CREATE TABLE fs_table ("
                + "    column_nameA STRING, "
                + "    column_nameB DOUBLE "
                + "    ) WITH ( \n"
                + "    'connector'='filesystem', "
                + "    'path'='file:///C:/temp/test5.txt', "
                + "    'format'='csv', "
                + "  'sink.partition-commit.delay'='1 s', "
                + "  'sink.partition-commit.policy.kind'='success-file'"
                + "    )");
        
    tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1");
        
    tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table")
    .execute()
    .print();
        
     }
}

 

Source file (test4.txt) is:

 

aa; 23
bb; 657.9
cc; 55

 

test5.txt is not created, select from fs_table gives null