In Eclipse only:
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/C:/Flink/flink-1.14.4/lib/flink-dist_2.11-1.14.4.jar) to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/C:/Flink/flink-1.14.4/lib/flink-dist_2.11-1.14.4.jar) to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
In Flink logs:
2022-07-29 22:04:15,835 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit does not contain a setter for field modificationTime
2022-07-29 22:04:15,836 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2022-07-29 22:04:15,949 INFO org.apache.flink.client.program.rest.RestClusterClient [] - Submitting job 'insert-into_default_catalog.default_database.fs_table' (064e2e154028411712786c61a79221bf).
2022-07-29 22:04:15,836 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2022-07-29 22:04:15,949 INFO org.apache.flink.client.program.rest.RestClusterClient [] - Submitting job 'insert-into_default_catalog.default_database.fs_table' (064e2e154028411712786c61a79221bf).
I do not think anything is wrong with these fields - they are correctly declared.
I see similar issue: https://issues.apache.org/jira/browse/FLINK-12410
This is a bug?
Sent: Tuesday, July 26, 2022 at 3:40 PM
From: "Weihua Hu" <[email protected]>
To: [email protected]
Cc: "user" <[email protected]>
Subject: Re: Why this example does not save anything to file?
From: "Weihua Hu" <[email protected]>
To: [email protected]
Cc: "user" <[email protected]>
Subject: Re: Why this example does not save anything to file?
Hi,
Can you see any exception logs?
Where is this code running? is it a standalone cluster with one TaskManager?
Best,
Weihua
Weihua
On Tue, Jul 26, 2022 at 4:18 AM <[email protected]> wrote:
If I get it correctly this is the way how I can save to CSV:So my code is (read from file, save to file):package flinkCSV;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;public class flinkCSV {public static void main(String[] args) throws Exception {
//register and create table
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
//.inStreamingMode()
.inBatchMode()
.build();final TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')");
tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM Table1")
.execute()
.print();
tEnv.executeSql("CREATE TABLE fs_table ("
+ " column_nameA STRING, "
+ " column_nameB DOUBLE "
+ " ) WITH ( \n"
+ " 'connector'='filesystem', "
+ " 'path'='file:///C:/temp/test5.txt', "
+ " 'format'='csv', "
+ " 'sink.partition-commit.delay'='1 s', "
+ " 'sink.partition-commit.policy.kind'='success-file'"
+ " )");
tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1");
tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table")
.execute()
.print();
}
}Source file (test4.txt) is:aa; 23
bb; 657.9
cc; 55test5.txt is not created, select from fs_table gives null
