dll02 opened a new issue, #10453:
URL: https://github.com/apache/iceberg/issues/10453
### Apache Iceberg version
1.4.2
### Query engine
Flink
### Please describe the bug 🐞
I am using AWS manage Flink service to read data from Iceberg + Glue tables
for real-time streaming joins. However, I have encountered an issue when
monitoring the real-time ingestion of new data, particularly with tables that
have unique key constraints, causing difficulties in reading the data properly.
flink version: 1.18 and 1.16
iceberg version: 1.4.2
table :
```sql
CREATE CATALOG pro_catalog WITH (
'type'='iceberg',
'warehouse'='s3://xxxx-data/iceberg-data/',
'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'
);
CREATE TABLE `pro_catalog`.`dim`.`dim_xxx_st` (
`col_1` VARCHAR NOT NULL,
`col_2` BIGINT NOT NULL,
`col_3` TIMESTAMP(6),
`col_4` DECIMAL(38, 15),
`col_5` VARCHAR NOT NULL,
PRIMARY KEY (`col_1`, `col_5`, `col_2`) NOT ENFORCED
) PARTITIONED BY (`col_5`)
WITH (
'write.parquet.row-group-size-bytes' = '33554432',
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'zstd',
'write.target-file-size-bytes' = '67108864',
'write.distribution-mode' = 'hash',
'read.split.target-size' = '33554432'
)
```
java code
```java
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env2 =
StreamExecutionEnvironment.getExecutionEnvironment();
env2.setParallelism(40);
env2.setMaxParallelism(40);
final StreamTableEnvironment tenv =
StreamTableEnvironment.create(env2);
tenv.getConfig().getConfiguration().setString("execution.type",
"streaming");
String database = "sink";
String catalogName = "pro_catalog";
String warehousePath = "s3://xxxxx/";
String catalogImpl = "org.apache.iceberg.aws.glue.GlueCatalog";
String ioImpl = "org.apache.iceberg.aws.s3.S3FileIO";
// Load GlueCatalog
Configuration hadoopConf = new Configuration(false);
Map<String, String> catalogProperties = new HashMap<>();
catalogProperties.put("warehouse", warehousePath);
catalogProperties.put("catalog-impl", catalogImpl);
catalogProperties.put("io-impl", ioImpl);
CatalogLoader catalogLoader = CatalogLoader.custom(catalogName,
catalogProperties, hadoopConf, catalogImpl);
Catalog flinkCatalog = new FlinkCatalog(catalogName, database,
Namespace.empty(), catalogLoader, true, -1);
tenv.registerCatalog(catalogName, flinkCatalog);
tenv.useCatalog(catalogName);
String priceSql = "select * from pro_catalog.dim.dim_xxxxx /*+
OPTIONS('streaming'='true', 'monitor-interval'='60s')*/" ;
Table sourceTable = tenv.sqlQuery(priceSql);
DataStream<Row> resultStream = tenv.toAppendStream(sourceTable,
Row.class);
resultStream.print();
env2.execute("Print Flink SQL query result");
}
```
I can't read realtime data with incremental file.
log like this:
14> +I[dao, 1717576320, 2024-06-05T08:32, 0.730100000000000, 2024-06-05]
2024-06-05 18:23:16,972 INFO [Legacy Source Thread - Source: Iceberg table
(pro_catalog.dim.dim_xxxxxx) monitor (1/1)#0]
iceberg.BaseMetastoreTableOperations
(BaseMetastoreTableOperations.java:refreshFromMetadataLocation(199)) -
Refreshing table metadata from new version:
s3://xxxx-warehouse/dim.db/dim_xxxx/metadata/00013-9e576a92-20df-41ca-a35b-c1041098711a.metadata.json
2024-06-05 18:24:18,911 INFO [Legacy Source Thread - Source: Iceberg table
(pro_catalog.dim.dim_xxxxxx) monitor (1/1)#0]
iceberg.BaseMetastoreTableOperations
(BaseMetastoreTableOperations.java:refreshFromMetadataLocation(199)) -
Refreshing table metadata from new version:
s3://xxxx-warehouse/dim.db/dim_xxxx/metadata/00014-3df59b12-76d6-4e19-9f4c-5656d8b491ec.metadata.json
2024-06-05 18:25:20,580 INFO [Legacy Source Thread - Source: Iceberg table
(pro_catalog.dim.dim_xxxxxx) monitor (1/1)#0]
iceberg.BaseMetastoreTableOperations (BaseMetast
oreTableOperations.java:refreshFromMetadataLocation(199)) - Refreshing table
metadata from new version:
s3://xxxx-warehouse/dim.db/dim_xxxx/metadata/00015-6eef0b39-60da-4c1c-80b5-06095fd084a4.metadata.json
2024-06-05 18:26:22,788 INFO [Legacy Source Thread - Source: Iceberg table
(pro_catalog.dim.dim_xxxxxx) monitor (1/1)#0]
iceberg.BaseMetastoreTableOperations
(BaseMetastoreTableOperations.java:refreshFromMetadataLocation(199)) -
Refreshing table metadata from new version:
s3://xxxx-warehouse/dim.db/dim_xxxx/metadata/00016-8ea0e74d-9676-4f49-9c57-568828eaa97b.metadata.json
2024-06-05 18:27:24,538 INFO [Legacy Source Thread - Source: Iceberg table
(pro_catalog.dim.dim_xxxxxx) monitor (1/1)#0]
iceberg.BaseMetastoreTableOperations
(BaseMetastoreTableOperations.java:refreshFromMetadataLocation(199)) -
Refreshing table metadata from new version:
s3://pond-warehouse/dev.db/dim_main_token_price_st/metadata/00017-a5311b9d-0eea-4631-9130-2f2aa3787023.metadata.json
I am not sure if this issue stems from AWS or Iceberg. Could you help me
investigate how to resolve this problem?
Thank you.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]