[ 
https://issues.apache.org/jira/browse/FLINK-33670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lyn Zhang updated FLINK-33670:
------------------------------
    Description: 
Dear all:

   I find that some public operators cannot be reused when submit a job with 
multi sinks. I have an example as follows:
{code:java}
CREATE TABLE source (
    id              STRING,
    ts              TIMESTAMP(3),
    v              BIGINT,
    WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
) WITH (
    'connector' = 'socket',
    'hostname' = 'localhost',
    'port' = '9999',
    'byte-delimiter' = '10',
    'format' = 'json'
);
CREATE VIEW source_distinct AS
SELECT * FROM (
    SELECT *, ROW_NUMBER() OVER w AS row_nu
    FROM source
    WINDOW w AS (PARTITION BY id ORDER BY proctime() ASC)
) WHERE row_nu = 1;
CREATE TABLE print1 (
     id             STRING,
     ts             TIMESTAMP(3)
) WITH('connector' = 'blackhole');
INSERT INTO print1 SELECT id, ts FROM source_distinct;
CREATE TABLE print2 (
     id              STRING,
     ts              TIMESTAMP(3),
     v               BIGINT
) WITH('connector' = 'blackhole');
INSERT INTO print2
SELECT id, TUMBLE_START(ts, INTERVAL '20' SECOND), SUM(v)
FROM source_distinct
GROUP BY TUMBLE(ts, INTERVAL '20' SECOND), id; {code}
!image-2023-11-28-14-31-30-153.png|width=384,height=145!

I try to check the code, Flink add the rule of CoreRules.PROJECT_MERGE by 
default, This will create different rel digests of  the deduplicate operator 
and finally fail to match common operators.

In real production environment, Reuse common operators like deduplicate is more 
worthy than project merge. A good solution is to interrupt the project merge 
cross shuffle operators in multi sinks cases. 

How did you consider it? Looking forward to your reply.

  was:
Dear all:

   I find that some public operators cannot be reused when submit a job with 
multi sinks. I have an example as follows:
{code:java}
CREATE TABLE source (
    id              STRING,
    ts              TIMESTAMP(3),
    v              BIGINT,
    WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
) WITH (
    'connector' = 'socket',
    'hostname' = 'localhost',
    'port' = '9999',
    'byte-delimiter' = '10',
    'format' = 'json'
);
CREATE VIEW source_distinct AS
SELECT * FROM (
    SELECT *, ROW_NUMBER() OVER w AS row_nu
    FROM source
    WINDOW w AS (PARTITION BY id ORDER BY proctime() ASC)
) WHERE row_nu = 1;
CREATE TABLE print1 (
     id             STRING,
     ts             TIMESTAMP(3)
) WITH('connector' = 'blackhole');
INSERT INTO print1 SELECT id, ts FROM source_distinct;
CREATE TABLE print2 (
     id              STRING,
     ts              TIMESTAMP(3),
     v               BIGINT
) WITH('connector' = 'blackhole');
INSERT INTO print2
SELECT id, TUMBLE_START(ts, INTERVAL '20' SECOND), SUM(v)
FROM source_distinct
GROUP BY TUMBLE(ts, INTERVAL '20' SECOND), id; {code}
!image-2023-11-28-14-31-30-153.png|width=384,height=145!

I try to check the code, Flink add the rule of CoreRules.PROJECT_MERGE by 
default, This will create different rel digests of  the deduplicate operator 
and finally cause match common operators fail.

In real production environment, Reuse common operators like deduplicate is more 
worthy than project merge. A good solution is to interrupt the project merge 
cross shuffle operators in multi sinks cases. 

How did you consider it? Looking forward to your reply.


> Public operators cannot be reused in multi sinks
> ------------------------------------------------
>
>                 Key: FLINK-33670
>                 URL: https://issues.apache.org/jira/browse/FLINK-33670
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 1.18.0
>            Reporter: Lyn Zhang
>            Priority: Major
>         Attachments: image-2023-11-28-14-31-30-153.png
>
>
> Dear all:
>    I find that some public operators cannot be reused when submit a job with 
> multi sinks. I have an example as follows:
> {code:java}
> CREATE TABLE source (
>     id              STRING,
>     ts              TIMESTAMP(3),
>     v              BIGINT,
>     WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
> ) WITH (
>     'connector' = 'socket',
>     'hostname' = 'localhost',
>     'port' = '9999',
>     'byte-delimiter' = '10',
>     'format' = 'json'
> );
> CREATE VIEW source_distinct AS
> SELECT * FROM (
>     SELECT *, ROW_NUMBER() OVER w AS row_nu
>     FROM source
>     WINDOW w AS (PARTITION BY id ORDER BY proctime() ASC)
> ) WHERE row_nu = 1;
> CREATE TABLE print1 (
>      id             STRING,
>      ts             TIMESTAMP(3)
> ) WITH('connector' = 'blackhole');
> INSERT INTO print1 SELECT id, ts FROM source_distinct;
> CREATE TABLE print2 (
>      id              STRING,
>      ts              TIMESTAMP(3),
>      v               BIGINT
> ) WITH('connector' = 'blackhole');
> INSERT INTO print2
> SELECT id, TUMBLE_START(ts, INTERVAL '20' SECOND), SUM(v)
> FROM source_distinct
> GROUP BY TUMBLE(ts, INTERVAL '20' SECOND), id; {code}
> !image-2023-11-28-14-31-30-153.png|width=384,height=145!
> I try to check the code, Flink add the rule of CoreRules.PROJECT_MERGE by 
> default, This will create different rel digests of  the deduplicate operator 
> and finally fail to match common operators.
> In real production environment, Reuse common operators like deduplicate is 
> more worthy than project merge. A good solution is to interrupt the project 
> merge cross shuffle operators in multi sinks cases. 
> How did you consider it? Looking forward to your reply.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to