zhengyuan created FLINK-32861:
---------------------------------

             Summary: Batch mode, sink to multables the task graph of web page 
monitor jobs , lack sink task graph block
                 Key: FLINK-32861
                 URL: https://issues.apache.org/jira/browse/FLINK-32861
             Project: Flink
          Issue Type: Bug
          Components: Project Website
    Affects Versions: 1.16.3
         Environment: flink 1.16

centos 7 64

mysql 5.7

paimon 0.5

open jdk 1.8 64
            Reporter: zhengyuan
         Attachments: flink-mult-out.sql, screenshot.png

Batch mode, sink to multables (mysql, paimon) the task graph of web page 
monitor jobs , lack sink task graph block. Expect 2 sink Task graphs . but the 
results is correct.


flink sql detail see attachements flink-mult-out.sql and screenshot.

flink-mult-out.sql:

======================

SET execution.checkpointing.interval=10000;
SET 
state.checkpoints.dir=hdfs://hadoop01:9000/flink/checkpoints/20230814103606840;
SET execution.runtime-mode=batch;
CREATE TABLE source_jdbc_9kT_QLyGtM(
`id` BIGINT,
`tenant_code` STRING,
`ces2` STRING,
`ces1` STRING,
`address` STRING,
`amount2` FLOAT,
`bizdate` DATE)  WITH ( 
    'connector'='jdbc',
    'scan.fetch-size'='300000',
    
'url'='jdbc:mysql://10.x.x.22:3306/data_storage?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai',
    'username'='xxx',
    'password'='xxxx',
    'table-name'='v_csmx_129_default'
);

 

CREATE VIEW tranform_sql_mapping AS select 
`id`,`tenant_code`,`ces2`,`ces1`,`address`,`amount2`,`bizdate` from 
source_jdbc_9kT_QLyGtM where  ( `id`<50000 );


CREATE TABLE data_processing_out_1(
`id` BIGINT,
`tenant_code` STRING,
`ces2` STRING,
`ces1` STRING,
`address` STRING,
`amount2` FLOAT,
`bizdate` DATE)  WITH ( 
    'connector'='jdbc',
    'sink.buffer-flush.max-rows'='50000',
    'sink.buffer-flush.interval'='0',
    
'url'='jdbc:mysql://10.x.x.22:3306/data_storage?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai',
    'username'='xxxx',
    'password'='xxxxx',
    'table-name'='data_processing_out_1'
);

INSERT INTO data_processing_out_1 select 
`id`,`tenant_code`,`ces2`,`ces1`,`address`,`amount2`,`bizdate` from 
tranform_sql_mapping;

CREATE CATALOG paimon WITH (
    'type' = 'paimon',
    'warehouse' = 'hdfs://hadoop01:9000/painmon/data-processing/paimon_ods'   
);

USE CATALOG paimon;
create database if not exists paimon.paimon_ods_db;
drop table if exists paimon_ods_db.paimon_mysql_test01;
CREATE TABLE if not exists paimon_ods_db.paimon_mysql_test01(
`id` BIGINT,
`tenant_code` STRING,
`ces2` STRING,
`ces1` STRING,
`address` STRING,
`amount2` FLOAT,
`bizdate` DATE
)  WITH (
   'sink.parallelism'='8',
   'bucket'='8',
   'bucket-key'='tenant_code',
   'sink.use-managed-memory-allocator'='true',
   'sink.managed.writer-buffer-memory'='512MB',
   'num-sorted-run.compaction-trigger'='20',
   'write-buffer-size'='1024MB',
   'write-buffer-spillable'='true',
   'write-mode'='append-only'
);

INSERT INTO paimon_ods_db.paimon_mysql_test01 select 
`id`,`tenant_code`,`ces2`,`ces1`,`address`,`amount2`,`bizdate` from 
default_catalog.default_database.tranform_sql_mapping;

 

==================================================

results

======================

trino> use paimon.paimon_ods_db;
USE
trino:paimon_ods_db> select count(*) from paimon_mysql_test01;
 _col0 
-------
 49999 
(1 row)

 

mysql> SELECT count(*) FROM data_storage.data_processing_out_1;
+----------+
| count(*) |
+----------+
|    49999 |
+----------+
1 row in set (0.06 sec)

mysql> 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to