Samrat002 commented on PR #21458: URL: https://github.com/apache/flink/pull/21458#issuecomment-1703902505
I have taken an example where a datagen table is created with 2 fields `fname` and `lname`. Also created another table which is of type filesystem and points to a specfic s3 path and format used is `csv`. ``` -- create a genertor table CREATE TABLE generator ( fname STRING, lname STRING ) WITH ( 'connector' = 'datagen' ); -- create a sample dynamic table with connector filesystem. It supports csv as format. CREATE TABLE `name_table` ( `fname` STRING, `lname` STRING ) with ( 'connector'='filesystem', 'format' = 'csv', 'path' = 's3://dbsamrat-flink-dev/data/default/name_table' ); -- run a job to insert data in table (s3) insert into name_table select * from generator; ``` Here is the below flink-conf file used for the cluster (also these configs are picked in job ) Attaching the jobmanager log for insertion of data in csvformated s3 path which uses CsvBulkWriter and maintains 2 phase commit. [jobmanager.log](https://github.com/apache/flink/files/12504473/insert_jobmanager.log) It can be noted that 2 phase commit is happening at checkpoint trigger. Additional job executed seperately to read data from name_table. [count_jobmanager.log](https://github.com/apache/flink/files/12504475/count_jobmanager.log) @dannycranmer @hlteoh37 please review if this satisfy the guarentee for exactly once . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org