VidakM opened a new issue, #9948:
URL: https://github.com/apache/iceberg/issues/9948
### Query engine
Flink 1.17.2 with Iceberg 1.4.2 libraries
### Question
I have a few Iceberg v2 tables defined and a Flink job that reads them in a
streaming fashion before transforming to another Iceberg table.
If the source tables are basic, then subscribing to them works great and the
SQL query can continuously run.
But if the tables are defined with `'write.upsert.enabled'='true'`, then the
subscribing Flink SQL will read only once, and not react to new snapshots. Even
if the SQL definition asks it to monitor intervals and the streaming strategy
is any incremental version.
Flink streaming query that normally works:
```sql
INSERT INTO iceberg.target_packaging
SELECT
usr.`user_id` AS `user_id`,
usr.`adress` AS `address`,
ord.`item_id` AS `item_id`,
....
FROM
iceberg.source_users /*+ OPTIONS('streaming'='true',
'monitor-interval'='15s') */ usr
JOIN
iceberg.source_orders /*+ OPTIONS('streaming'='true',
'monitor-interval'='15s') */ ord ON usr.`user_id` = ord.`user_id`;
```
The streaming join works great if the source Iceberg tables are defined like
this:
```
CREATE TABLE iceberg.source_users (
`user_id` STRING,
`adress` STRING,
....
PRIMARY KEY (`user_id`) NOT ENFORCED
) with ('format-version'='2');
Resulting table properties example:
[current-snapshot-id=7980858807056176990,format=iceberg/parquet,format-version=2,identifier-fields=[user_id],write.parquet.compression-codec=zstd]
```
But the streaming join runs only once, and then stops triggering on new
snapshots. It does not finish though, just stops reacting from source and
produces no new records.
```
CREATE TABLE iceberg.source_users (
`user_id` STRING,
`adress` STRING,
....
PRIMARY KEY (`user_id`) NOT ENFORCED
) with ('format-version'='2', 'write.upsert.enabled'='true');
Resulting table properties example:
[current-snapshot-id=3566387524956156231,format=iceberg/parquet,format-version=2,identifier-fields=[user_id],write.parquet.compression-codec=zstd,write.upsert.enabled=true]
```
In my Flink job i simply define the connector and run the SQL join/insert.
Both source and target table is already defined.
I also noticed that If I have an SQL Join, it too stops streaming if at
least one table has upsert enabled.
Looking at the documentation for both Iceberg and Flink I don't find any
indication that enabling upsert should alter the behaviour - but I do remember
reading somewhere that FLIP27 only supports append and not update / delete. Is
this the reason I'm seeing this behaviour?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]