AHuio opened a new issue, #5857:
URL: https://github.com/apache/iceberg/issues/5857
### Query engine
Flink: 1.13.5
iceberg:0.13.2
### Question
`CREATE TABLE flink_dg (
id INT,c1 VARCHAR
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '100000',
'fields.c1.length' = '12'
);
create table iceberg_hive_catalog.dhome_db.test_iceberg(
id INT,c1 VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) with (
'format-version'='2',
'write.upsert.enabled'='true',
'write.metadata.delete-after-commit.enabled'='true',
'write.metadata.previous-versions-max'='5',
'flink.rewrite.enable' = 'true',
'flink.rewrite.parallelism' = '1',
'flink.rewrite.target-file-size-bytes' = '536870912',
'flink.rewrite.max-files-count' = '5'
);
-- insert into iceberg v2 table.
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(5000, EXACTLY_ONCE);
...
tEnv.executeSql("
insert into hive_iceberg.dhome_db.test_iceberg
select * from flink_dg limit 1000000");
-- flink compact data file
EnvironmentSettings env =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
Map hashMap = new HashMap();
hashMap.put("uri","thrift://xxxxx:9083");
hashMap.put("clients","5");
hashMap.put("property-version","1");
hashMap.put("warehouse","hdfs://nameservice1/user/hive/warehouse");
CatalogLoader hive_iceberg = CatalogLoader.hive("hive_iceberg", new
Configuration(), hashMap);
Catalog catalog_iceberg = hive_iceberg.loadCatalog();
Table table_iceberg =
catalog_iceberg.loadTable(TableIdentifier.of("dhome_db","test_iceberg"));
Actions.forTable(table_iceberg)
.rewriteDataFiles()
.maxParallelism(5)
.execute();
Snapshot snapshot = table_iceberg.currentSnapshot();
long old = snapshot.timestampMillis();
if (snapshot != null) {
table_iceberg.expireSnapshots().expireOlderThan(old).cleanExpiredFiles(true).commit();
}
`
I tested the small file compression process through the above code, and
found that the file was successfully compressed(at time 14:56), but the
corresponding snapshot file was not generated.
<img width="985" alt="iceberg-20220926"
src="https://user-images.githubusercontent.com/20868410/192242773-05f95903-8ead-479d-965c-d6df55c04145.PNG">
<img width="1068" alt="iceberg-20220926-2"
src="https://user-images.githubusercontent.com/20868410/192243302-7bca6af4-52fd-4483-ab7c-2bdcaa77c62b.PNG">
Do you want to consult whether the method of Flink compressed files is wrong
or that the method will not generate the corresponding snapshot file?
thanks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]