13535048320 opened a new issue, #9363:
URL: https://github.com/apache/iceberg/issues/9363

   ### Query engine
   
   Write: Flink
   Read: Trino
   
   ### Question
   
   When using the Flink upsert mode, the speed of reading data from the iceberg 
table is very slow, it takes 1 minute to query only 5 million data. If disable 
upsert mode, the query will be very fast, with 80 million data in just 3 
seconds.
   
   Version:
   Flink 1.16.0
   Iceberg 1.3.1
   Hive metastore standalone 3.1.3
   
   Flink SQL
   ```
   CREATE CATALOG hive WITH (
     'type'='iceberg',
     'catalog-type'='hive',
     'uri'='thrift://hive:9083',
     'warehouse'='s3a://warehouse',
     's3.endpoint'='http://minio:9000',
     's3.access-key-id'='user',
     's3.secret-access-key'='pass',
     'client.region'='us-east-1',
     'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
     'clients'='3',
     'property-version'='1'
   );
   
   create table if not exists hive.ods.ausp (
        objek string,
        atinn string,
        atzhl string,
        mafid string,
        klart string,
        adzhl string,
        atflv float,
       dwd_data_time TIMESTAMP(3),
       PRIMARY KEY(objek,atinn,atzhl,mafid,klart,adzhl) NOT ENFORCED
   )
   with (
     'format-version'='2',
     'write.format.default'='parquet',
     'write.upsert.enabled'='true'
   );
   
   create table source_ausp
   ( 
    `schema` STRING comment 'table schema',
    `payload` ROW(
        OBJEK string,
        ATINN string, 
        ATZHL string, 
        MAFID string, 
        KLART string, 
        ADZHL string, 
        ATFLV float
        ) comment 'row data'
   ) with (
   'connector' = 'kafka',
   'topic' = 'AUSP',
   'properties.group.id' = 'source_ausp',
   'scan.startup.mode' = 'group-offsets',
   'properties.bootstrap.servers' = 'kafka:9071',
   'properties.auto.offset.reset' = 'earliest',
   'properties.security.protocol' = 'SASL_PLAINTEXT',
   'properties.sasl.jaas.config' = 
'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule
 required username="user"  password="pass";',
   'properties.sasl.mechanism' = 'PLAIN',
   'format' = 'json',
   'json.fail-on-missing-field' = 'false',
   , 'json.ignore-parse-errors' = 'true',
   ,'sink.parallelism' = '3'
   );
   
   insert into hive.ods.ausp(objek, atinn, atzhl, mafid, klart, adzhl, atflv, 
dwd_data_time) 
   select OBJEK, ATINN, ATZHL, MAFID, KLART, ADZHL, ATFLV, cast( 
FROM_UNIXTIME(UNIX_TIMESTAMP()) as TIMESTAMP(3) ) as DWD_DATA_TIME from 
source_ausp;
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to