AHuio opened a new issue, #5857:
URL: https://github.com/apache/iceberg/issues/5857

   ### Query engine
   
   Flink: 1.13.5
   iceberg:0.13.2
   
   ### Question
   
   `CREATE TABLE flink_dg (
       id INT,c1 VARCHAR
   )
   WITH (
       'connector' = 'datagen',
       'rows-per-second' = '100000',
       'fields.c1.length' = '12'
     ); 
   
   create table iceberg_hive_catalog.dhome_db.test_iceberg(
    id INT,c1 VARCHAR,
    PRIMARY KEY (id) NOT ENFORCED
   ) with (
    'format-version'='2',
    'write.upsert.enabled'='true',
    'write.metadata.delete-after-commit.enabled'='true',
    'write.metadata.previous-versions-max'='5',
    'flink.rewrite.enable' = 'true',
    'flink.rewrite.parallelism' = '1',
    'flink.rewrite.target-file-size-bytes' = '536870912',
    'flink.rewrite.max-files-count' = '5'
   );
   
   -- insert into iceberg v2 table.
   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
   StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
   
   env.enableCheckpointing(5000, EXACTLY_ONCE);
   ...
   tEnv.executeSql("
   insert into hive_iceberg.dhome_db.test_iceberg
   select * from flink_dg limit 1000000");
   
   -- flink compact data file
         EnvironmentSettings env = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
           Map hashMap = new HashMap();
           hashMap.put("uri","thrift://xxxxx:9083");
           hashMap.put("clients","5");
           hashMap.put("property-version","1");
           hashMap.put("warehouse","hdfs://nameservice1/user/hive/warehouse");
   
           CatalogLoader hive_iceberg = CatalogLoader.hive("hive_iceberg", new 
Configuration(), hashMap);
           Catalog catalog_iceberg = hive_iceberg.loadCatalog();
           Table table_iceberg = 
catalog_iceberg.loadTable(TableIdentifier.of("dhome_db","test_iceberg"));
   
           Actions.forTable(table_iceberg)
                   .rewriteDataFiles()
                   .maxParallelism(5)
                   .execute();
   
           Snapshot snapshot = table_iceberg.currentSnapshot();
           long old = snapshot.timestampMillis();
           if (snapshot != null) {
               
table_iceberg.expireSnapshots().expireOlderThan(old).cleanExpiredFiles(true).commit();
           }
   `
   
   I tested the small file compression process through the above code, and 
found that the file was successfully compressed(at time 14:56), but the 
corresponding snapshot file was not generated.
   
   <img width="985" alt="iceberg-20220926" 
src="https://user-images.githubusercontent.com/20868410/192242773-05f95903-8ead-479d-965c-d6df55c04145.PNG";>
   <img width="1068" alt="iceberg-20220926-2" 
src="https://user-images.githubusercontent.com/20868410/192243302-7bca6af4-52fd-4483-ab7c-2bdcaa77c62b.PNG";>
   
   
   Do you want to consult whether the method of Flink compressed files is wrong 
or that the method will not generate the corresponding snapshot file?
   
   thanks.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to