zhengyuan created FLINK-32861: --------------------------------- Summary: Batch mode, sink to multables the task graph of web page monitor jobs , lack sink task graph block Key: FLINK-32861 URL: https://issues.apache.org/jira/browse/FLINK-32861 Project: Flink Issue Type: Bug Components: Project Website Affects Versions: 1.16.3 Environment: flink 1.16
centos 7 64 mysql 5.7 paimon 0.5 open jdk 1.8 64 Reporter: zhengyuan Attachments: flink-mult-out.sql, screenshot.png Batch mode, sink to multables (mysql, paimon) the task graph of web page monitor jobs , lack sink task graph block. Expect 2 sink Task graphs . but the results is correct. flink sql detail see attachements flink-mult-out.sql and screenshot. flink-mult-out.sql: ====================== SET execution.checkpointing.interval=10000; SET state.checkpoints.dir=hdfs://hadoop01:9000/flink/checkpoints/20230814103606840; SET execution.runtime-mode=batch; CREATE TABLE source_jdbc_9kT_QLyGtM( `id` BIGINT, `tenant_code` STRING, `ces2` STRING, `ces1` STRING, `address` STRING, `amount2` FLOAT, `bizdate` DATE) WITH ( 'connector'='jdbc', 'scan.fetch-size'='300000', 'url'='jdbc:mysql://10.x.x.22:3306/data_storage?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai', 'username'='xxx', 'password'='xxxx', 'table-name'='v_csmx_129_default' ); CREATE VIEW tranform_sql_mapping AS select `id`,`tenant_code`,`ces2`,`ces1`,`address`,`amount2`,`bizdate` from source_jdbc_9kT_QLyGtM where ( `id`<50000 ); CREATE TABLE data_processing_out_1( `id` BIGINT, `tenant_code` STRING, `ces2` STRING, `ces1` STRING, `address` STRING, `amount2` FLOAT, `bizdate` DATE) WITH ( 'connector'='jdbc', 'sink.buffer-flush.max-rows'='50000', 'sink.buffer-flush.interval'='0', 'url'='jdbc:mysql://10.x.x.22:3306/data_storage?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai', 'username'='xxxx', 'password'='xxxxx', 'table-name'='data_processing_out_1' ); INSERT INTO data_processing_out_1 select `id`,`tenant_code`,`ces2`,`ces1`,`address`,`amount2`,`bizdate` from tranform_sql_mapping; CREATE CATALOG paimon WITH ( 'type' = 'paimon', 'warehouse' = 'hdfs://hadoop01:9000/painmon/data-processing/paimon_ods' ); USE CATALOG paimon; create database if not exists paimon.paimon_ods_db; drop table if exists paimon_ods_db.paimon_mysql_test01; CREATE TABLE if not exists paimon_ods_db.paimon_mysql_test01( `id` BIGINT, `tenant_code` STRING, `ces2` STRING, `ces1` STRING, `address` STRING, `amount2` FLOAT, `bizdate` DATE ) WITH ( 'sink.parallelism'='8', 'bucket'='8', 'bucket-key'='tenant_code', 'sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='512MB', 'num-sorted-run.compaction-trigger'='20', 'write-buffer-size'='1024MB', 'write-buffer-spillable'='true', 'write-mode'='append-only' ); INSERT INTO paimon_ods_db.paimon_mysql_test01 select `id`,`tenant_code`,`ces2`,`ces1`,`address`,`amount2`,`bizdate` from default_catalog.default_database.tranform_sql_mapping; ================================================== results ====================== trino> use paimon.paimon_ods_db; USE trino:paimon_ods_db> select count(*) from paimon_mysql_test01; _col0 ------- 49999 (1 row) mysql> SELECT count(*) FROM data_storage.data_processing_out_1; +----------+ | count(*) | +----------+ | 49999 | +----------+ 1 row in set (0.06 sec) mysql> -- This message was sent by Atlassian Jira (v8.20.10#820010)