[ https://issues.apache.org/jira/browse/FLINK-33670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lyn Zhang updated FLINK-33670: ------------------------------ Description: Dear all: I find that some public operators cannot be reused when submit a job with multi sinks. I have an example as follows: {code:java} CREATE TABLE source ( id STRING, ts TIMESTAMP(3), v BIGINT, WATERMARK FOR ts AS ts - INTERVAL '3' SECOND ) WITH ( 'connector' = 'socket', 'hostname' = 'localhost', 'port' = '9999', 'byte-delimiter' = '10', 'format' = 'json' ); CREATE VIEW source_distinct AS SELECT * FROM ( SELECT *, ROW_NUMBER() OVER w AS row_nu FROM source WINDOW w AS (PARTITION BY id ORDER BY proctime() ASC) ) WHERE row_nu = 1; CREATE TABLE print1 ( id STRING, ts TIMESTAMP(3) ) WITH('connector' = 'blackhole'); INSERT INTO print1 SELECT id, ts FROM source_distinct; CREATE TABLE print2 ( id STRING, ts TIMESTAMP(3), v BIGINT ) WITH('connector' = 'blackhole'); INSERT INTO print2 SELECT id, TUMBLE_START(ts, INTERVAL '20' SECOND), SUM(v) FROM source_distinct GROUP BY TUMBLE(ts, INTERVAL '20' SECOND), id; {code} !image-2023-11-28-14-31-30-153.png|width=384,height=145! I try to check the code, Flink add the rule of CoreRules.PROJECT_MERGE by default, This will create different rel digests of the deduplicate operator and finally fail to match common operators. In real production environment, Reuse common operators like deduplicate is more worthy than project merge. A good solution is to interrupt the project merge cross shuffle operators in multi sinks cases. How did you consider it? Looking forward to your reply. was: Dear all: I find that some public operators cannot be reused when submit a job with multi sinks. I have an example as follows: {code:java} CREATE TABLE source ( id STRING, ts TIMESTAMP(3), v BIGINT, WATERMARK FOR ts AS ts - INTERVAL '3' SECOND ) WITH ( 'connector' = 'socket', 'hostname' = 'localhost', 'port' = '9999', 'byte-delimiter' = '10', 'format' = 'json' ); CREATE VIEW source_distinct AS SELECT * FROM ( SELECT *, ROW_NUMBER() OVER w AS row_nu FROM source WINDOW w AS (PARTITION BY id ORDER BY proctime() ASC) ) WHERE row_nu = 1; CREATE TABLE print1 ( id STRING, ts TIMESTAMP(3) ) WITH('connector' = 'blackhole'); INSERT INTO print1 SELECT id, ts FROM source_distinct; CREATE TABLE print2 ( id STRING, ts TIMESTAMP(3), v BIGINT ) WITH('connector' = 'blackhole'); INSERT INTO print2 SELECT id, TUMBLE_START(ts, INTERVAL '20' SECOND), SUM(v) FROM source_distinct GROUP BY TUMBLE(ts, INTERVAL '20' SECOND), id; {code} !image-2023-11-28-14-31-30-153.png|width=384,height=145! I try to check the code, Flink add the rule of CoreRules.PROJECT_MERGE by default, This will create different rel digests of the deduplicate operator and finally cause match common operators fail. In real production environment, Reuse common operators like deduplicate is more worthy than project merge. A good solution is to interrupt the project merge cross shuffle operators in multi sinks cases. How did you consider it? Looking forward to your reply. > Public operators cannot be reused in multi sinks > ------------------------------------------------ > > Key: FLINK-33670 > URL: https://issues.apache.org/jira/browse/FLINK-33670 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner > Affects Versions: 1.18.0 > Reporter: Lyn Zhang > Priority: Major > Attachments: image-2023-11-28-14-31-30-153.png > > > Dear all: > I find that some public operators cannot be reused when submit a job with > multi sinks. I have an example as follows: > {code:java} > CREATE TABLE source ( > id STRING, > ts TIMESTAMP(3), > v BIGINT, > WATERMARK FOR ts AS ts - INTERVAL '3' SECOND > ) WITH ( > 'connector' = 'socket', > 'hostname' = 'localhost', > 'port' = '9999', > 'byte-delimiter' = '10', > 'format' = 'json' > ); > CREATE VIEW source_distinct AS > SELECT * FROM ( > SELECT *, ROW_NUMBER() OVER w AS row_nu > FROM source > WINDOW w AS (PARTITION BY id ORDER BY proctime() ASC) > ) WHERE row_nu = 1; > CREATE TABLE print1 ( > id STRING, > ts TIMESTAMP(3) > ) WITH('connector' = 'blackhole'); > INSERT INTO print1 SELECT id, ts FROM source_distinct; > CREATE TABLE print2 ( > id STRING, > ts TIMESTAMP(3), > v BIGINT > ) WITH('connector' = 'blackhole'); > INSERT INTO print2 > SELECT id, TUMBLE_START(ts, INTERVAL '20' SECOND), SUM(v) > FROM source_distinct > GROUP BY TUMBLE(ts, INTERVAL '20' SECOND), id; {code} > !image-2023-11-28-14-31-30-153.png|width=384,height=145! > I try to check the code, Flink add the rule of CoreRules.PROJECT_MERGE by > default, This will create different rel digests of the deduplicate operator > and finally fail to match common operators. > In real production environment, Reuse common operators like deduplicate is > more worthy than project merge. A good solution is to interrupt the project > merge cross shuffle operators in multi sinks cases. > How did you consider it? Looking forward to your reply. -- This message was sent by Atlassian Jira (v8.20.10#820010)