Samrat002 commented on PR #21458:
URL: https://github.com/apache/flink/pull/21458#issuecomment-1703902505

   I have taken an example where a datagen table is created with 2 fields 
`fname` and `lname`. Also created another table which is of type filesystem and 
points to a specfic s3 path and format used is `csv`. 
   
   ```
    -- create a genertor table 
   CREATE TABLE generator (
       fname STRING,
       lname STRING
   ) WITH (
     'connector' = 'datagen'
     
   );
   
   -- create a sample dynamic table with connector filesystem. It supports csv 
as format. 
   CREATE TABLE `name_table` (
     `fname` STRING,
     `lname` STRING
   ) with (
   'connector'='filesystem',
   'format' = 'csv',
   'path' = 's3://dbsamrat-flink-dev/data/default/name_table'
   );
   
   -- run a job to insert data in table (s3)
   insert into name_table select * from generator;
   ```
   
   Here is the below flink-conf file used for the cluster (also these configs 
are picked in job )
   
    
   
   
   Attaching the jobmanager log for insertion of data in csvformated s3 path 
which uses CsvBulkWriter and maintains 2 phase commit. 
   
[jobmanager.log](https://github.com/apache/flink/files/12504473/insert_jobmanager.log)
   
   
   It can be noted that 2 phase commit is happening at checkpoint trigger. 
   
   Additional job executed seperately to read data from name_table. 
   
[count_jobmanager.log](https://github.com/apache/flink/files/12504475/count_jobmanager.log)
   
   
   @dannycranmer @hlteoh37 please review if this satisfy the guarentee for 
exactly once . 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to