[ 
https://issues.apache.org/jira/browse/FLINK-22113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fu Kai updated FLINK-22113:
---------------------------
    Description: 
Hi team,
  
 We have a use case to join multiple data sources to generate a continuous 
updated view. We defined primary key constraint on all the input sources and 
all the keys are the subsets in the join condition. All joins are left join.
  
 In our case, the first two inputs can produce *JoinKeyContainsUniqueKey* input 
sepc, which is good and performant. While when it comes to the third input 
source, it's joined with the intermediate output table of the first two input 
tables, and the intermediate table does not carry key constraint 
information(although the thrid source input table does), so it results in a 
*NoUniqueKey* input sepc. Given NoUniqueKey inputs has dramatic performance 
implications per the[ Force Join Unique 
Key|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Force-Join-Unique-Key-td39521.html#a39651]
 email thread, we want to know if there is any mitigation solution for this.

 

Example:

Take the example from 
[https://github.com/ververica/flink-sql-cookbook/blob/master/joins/05/05_star_schema.md]
{code:java}
CREATE TEMPORARY TABLE passengers (
  passenger_key STRING,
  first_name STRING,
  last_name STRING,
  update_time TIMESTAMP(3),
  PRIMARY KEY (passenger_key) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'passengers',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'value.format' = 'json'
);


CREATE TEMPORARY TABLE stations (
  station_key STRING,
  update_time TIMESTAMP(3),
  city STRING,
  PRIMARY KEY (station_key) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'stations',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE booking_channels (
  booking_channel_key STRING,
  update_time TIMESTAMP(3),
  channel STRING,
  PRIMARY KEY (booking_channel_key) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'booking_channels',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE train_activities (
  scheduled_departure_time TIMESTAMP(3),
  actual_departure_date TIMESTAMP(3),
  passenger_key STRING,
  origin_station_key STRING,
  destination_station_key STRING,
  booking_channel_key STRING,
  PRIMARY KEY (booking_channel_key, origin_station_key, 
destination_station_key) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'train_activities',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);

SELECT 
  t.actual_departure_date, 
  p.first_name,
  p.last_name,
  b.channel, 
  os.city AS origin_station,
  ds.city AS destination_station
FROM train_activities_1 t
LEFT JOIN booking_channels b 
ON t.booking_channel_key = b.booking_channel_key
LEFT JOIN passengers p
ON t.passenger_key = p.passenger_key
LEFT JOIN stations os
ON t.origin_station_key = os.station_key
LEFT JOIN stations ds
ON t.destination_station_key = ds.station_key

{code}
 

 The query will generate exeuction plan of:

 
{code:java}
Flink SQL> explain
>  SELECT
>    t.actual_departure_date,
>    p.first_name,
>    p.last_name,
>    b.channel,
>    os.city AS origin_station,
>    ds.city AS destination_station
>  FROM train_activities_1 t
>  LEFT JOIN booking_channels b
>  ON t.booking_channel_key = b.booking_channel_key
>  LEFT JOIN passengers p
>  ON t.passenger_key = p.passenger_key
>  LEFT JOIN stations os
>  ON t.origin_station_key = os.station_key
>  LEFT JOIN stations ds
>  ON t.destination_station_key = ds.station_key;
== Abstract Syntax Tree ==
LogicalProject(actual_departure_date=[$1], first_name=[$10], last_name=[$11], 
channel=[$8], origin_station=[$15], destination_station=[$18])
+- LogicalJoin(condition=[=($4, $16)], joinType=[left])
   :- LogicalJoin(condition=[=($3, $13)], joinType=[left])
   :  :- LogicalJoin(condition=[=($2, $9)], joinType=[left])
   :  :  :- LogicalJoin(condition=[=($5, $6)], joinType=[left])
   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, 
train_activities_1]])
   :  :  :  +- LogicalWatermarkAssigner(rowtime=[update_time], watermark=[-($1, 
10000:INTERVAL SECOND)])
   :  :  :     +- LogicalTableScan(table=[[default_catalog, default_database, 
booking_channels]])
   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, 
passengers]])
   :  +- LogicalTableScan(table=[[default_catalog, default_database, stations]])
   +- LogicalTableScan(table=[[default_catalog, default_database, stations]])== 
Optimized Physical Plan ==
Calc(select=[actual_departure_date, first_name, last_name, channel, city AS 
origin_station, city0 AS destination_station])
+- Join(joinType=[LeftOuterJoin], where=[=(destination_station_key, 
station_key)], select=[actual_departure_date, destination_station_key, channel, 
first_name, last_name, city, station_key, city0], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
   :- Exchange(distribution=[hash[destination_station_key]])
   :  +- Calc(select=[actual_departure_date, destination_station_key, channel, 
first_name, last_name, city])
   :     +- Join(joinType=[LeftOuterJoin], where=[=(origin_station_key, 
station_key)], select=[actual_departure_date, origin_station_key, 
destination_station_key, channel, first_name, last_name, station_key, city], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
   :        :- Exchange(distribution=[hash[origin_station_key]])
   :        :  +- Calc(select=[actual_departure_date, origin_station_key, 
destination_station_key, channel, first_name, last_name])
   :        :     +- Join(joinType=[LeftOuterJoin], where=[=(passenger_key, 
passenger_key0)], select=[actual_departure_date, passenger_key, 
origin_station_key, destination_station_key, channel, passenger_key0, 
first_name, last_name], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
   :        :        :- Exchange(distribution=[hash[passenger_key]])
   :        :        :  +- Calc(select=[actual_departure_date, passenger_key, 
origin_station_key, destination_station_key, channel])
   :        :        :     +- Join(joinType=[LeftOuterJoin], 
where=[=(booking_channel_key, booking_channel_key0)], 
select=[actual_departure_date, passenger_key, origin_station_key, 
destination_station_key, booking_channel_key, booking_channel_key0, channel], 
leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
   :        :        :        :- 
Exchange(distribution=[hash[booking_channel_key]])
   :        :        :        :  +- Calc(select=[actual_departure_date, 
passenger_key, origin_station_key, destination_station_key, 
booking_channel_key])
   :        :        :        :     +- 
ChangelogNormalize(key=[booking_channel_key, origin_station_key, 
destination_station_key])
   :        :        :        :        +- 
Exchange(distribution=[hash[booking_channel_key, origin_station_key, 
destination_station_key]])
   :        :        :        :           +- 
TableSourceScan(table=[[default_catalog, default_database, 
train_activities_1]], fields=[scheduled_departure_time, actual_departure_date, 
passenger_key, origin_station_key, destination_station_key, 
booking_channel_key])
   :        :        :        +- 
Exchange(distribution=[hash[booking_channel_key]])
   :        :        :           +- Calc(select=[booking_channel_key, channel])
   :        :        :              +- 
ChangelogNormalize(key=[booking_channel_key])
   :        :        :                 +- 
Exchange(distribution=[hash[booking_channel_key]])
   :        :        :                    +- 
TableSourceScan(table=[[default_catalog, default_database, booking_channels, 
watermark=[-($1, 10000:INTERVAL SECOND)]]], fields=[booking_channel_key, 
update_time, channel])
   :        :        +- Exchange(distribution=[hash[passenger_key]])
   :        :           +- Calc(select=[passenger_key, first_name, last_name])
   :        :              +- ChangelogNormalize(key=[passenger_key])
   :        :                 +- Exchange(distribution=[hash[passenger_key]])
   :        :                    +- TableSourceScan(table=[[default_catalog, 
default_database, passengers]], fields=[passenger_key, first_name, last_name, 
update_time])
   :        +- Exchange(distribution=[hash[station_key]])
   :           +- Calc(select=[station_key, city])
   :              +- ChangelogNormalize(key=[station_key])
   :                 +- Exchange(distribution=[hash[station_key]])
   :                    +- TableSourceScan(table=[[default_catalog, 
default_database, stations]], fields=[station_key, update_time, city])
   +- Exchange(distribution=[hash[station_key]])
      +- Calc(select=[station_key, city])
         +- ChangelogNormalize(key=[station_key])
            +- Exchange(distribution=[hash[station_key]])
               +- TableSourceScan(table=[[default_catalog, default_database, 
stations]], fields=[station_key, update_time, city])== Optimized Execution Plan 
==
Calc(select=[actual_departure_date, first_name, last_name, channel, city AS 
origin_station, city0 AS destination_station])
+- Join(joinType=[LeftOuterJoin], where=[(destination_station_key = 
station_key)], select=[actual_departure_date, destination_station_key, channel, 
first_name, last_name, city, station_key, city0], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
   :- Exchange(distribution=[hash[destination_station_key]])
   :  +- Calc(select=[actual_departure_date, destination_station_key, channel, 
first_name, last_name, city])
   :     +- Join(joinType=[LeftOuterJoin], where=[(origin_station_key = 
station_key)], select=[actual_departure_date, origin_station_key, 
destination_station_key, channel, first_name, last_name, station_key, city], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
   :        :- Exchange(distribution=[hash[origin_station_key]])
   :        :  +- Calc(select=[actual_departure_date, origin_station_key, 
destination_station_key, channel, first_name, last_name])
   :        :     +- Join(joinType=[LeftOuterJoin], where=[(passenger_key = 
passenger_key0)], select=[actual_departure_date, passenger_key, 
origin_station_key, destination_station_key, channel, passenger_key0, 
first_name, last_name], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
   :        :        :- Exchange(distribution=[hash[passenger_key]])
   :        :        :  +- Calc(select=[actual_departure_date, passenger_key, 
origin_station_key, destination_station_key, channel])
   :        :        :     +- Join(joinType=[LeftOuterJoin], 
where=[(booking_channel_key = booking_channel_key0)], 
select=[actual_departure_date, passenger_key, origin_station_key, 
destination_station_key, booking_channel_key, booking_channel_key0, channel], 
leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
   :        :        :        :- 
Exchange(distribution=[hash[booking_channel_key]])
   :        :        :        :  +- Calc(select=[actual_departure_date, 
passenger_key, origin_station_key, destination_station_key, 
booking_channel_key])
   :        :        :        :     +- 
ChangelogNormalize(key=[booking_channel_key, origin_station_key, 
destination_station_key])
   :        :        :        :        +- 
Exchange(distribution=[hash[booking_channel_key, origin_station_key, 
destination_station_key]])
   :        :        :        :           +- 
TableSourceScan(table=[[default_catalog, default_database, 
train_activities_1]], fields=[scheduled_departure_time, actual_departure_date, 
passenger_key, origin_station_key, destination_station_key, 
booking_channel_key])
   :        :        :        +- 
Exchange(distribution=[hash[booking_channel_key]])
   :        :        :           +- Calc(select=[booking_channel_key, channel])
   :        :        :              +- 
ChangelogNormalize(key=[booking_channel_key])
   :        :        :                 +- 
Exchange(distribution=[hash[booking_channel_key]])
   :        :        :                    +- 
TableSourceScan(table=[[default_catalog, default_database, booking_channels, 
watermark=[-($1, 10000:INTERVAL SECOND)]]], fields=[booking_channel_key, 
update_time, channel])
   :        :        +- Exchange(distribution=[hash[passenger_key]])
   :        :           +- Calc(select=[passenger_key, first_name, last_name])
   :        :              +- ChangelogNormalize(key=[passenger_key])
   :        :                 +- Exchange(distribution=[hash[passenger_key]])
   :        :                    +- TableSourceScan(table=[[default_catalog, 
default_database, passengers]], fields=[passenger_key, first_name, last_name, 
update_time])
   :        +- Exchange(distribution=[hash[station_key]])(reuse_id=[1])
   :           +- Calc(select=[station_key, city])
   :              +- ChangelogNormalize(key=[station_key])
   :                 +- Exchange(distribution=[hash[station_key]])
   :                    +- TableSourceScan(table=[[default_catalog, 
default_database, stations]], fields=[station_key, update_time, city])
   +- Reused(reference_id=[1])
{code}
 

 

  was:
Hi team,
  
 We have a use case to join multiple data sources to generate a continuous 
updated view. We defined primary key constraint on all the input sources and 
all the keys are the subsets in the join condition. All joins are left join.
  
 In our case, the first two inputs can produce *JoinKeyContainsUniqueKey* input 
sepc, which is good and performant. While when it comes to the third input 
source, it's joined with the intermediate output table of the first two input 
tables, and the intermediate table does not carry key constraint 
information(although the thrid source input table does), so it results in a 
*NoUniqueKey* input sepc. Given NoUniqueKey inputs has dramatic performance 
implications per the[ Force Join Unique 
Key|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Force-Join-Unique-Key-td39521.html#a39651]
 email thread, we want to know if there is any mitigation solution for this.

 

Example:

Take the example from 
[https://github.com/ververica/flink-sql-cookbook/blob/master/joins/05/05_star_schema.md]
{code:java}
CREATE TEMPORARY TABLE passengers (
  passenger_key STRING,
  first_name STRING,
  last_name STRING,
  update_time TIMESTAMP(3),
  PRIMARY KEY (passenger_key) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'passengers',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'value.format' = 'json'
);CREATE TEMPORARY TABLE stations (
  station_key STRING,
  update_time TIMESTAMP(3),
  city STRING,
  PRIMARY KEY (station_key) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'stations',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'value.format' = 'json'
);CREATE TEMPORARY TABLE booking_channels (
  booking_channel_key STRING,
  update_time TIMESTAMP(3),
  channel STRING,
  PRIMARY KEY (booking_channel_key) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'booking_channels',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'value.format' = 'json'
);CREATE TEMPORARY TABLE train_activities (
  scheduled_departure_time TIMESTAMP(3),
  actual_departure_date TIMESTAMP(3),
  passenger_key STRING,
  origin_station_key STRING,
  destination_station_key STRING,
  booking_channel_key STRING,
  PRIMARY KEY (booking_channel_key, origin_station_key, 
destination_station_key) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'train_activities',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);SELECT 
  t.actual_departure_date, 
  p.first_name,
  p.last_name,
  b.channel, 
  os.city AS origin_station,
  ds.city AS destination_station
FROM train_activities_1 t
LEFT JOIN booking_channels b 
ON t.booking_channel_key = b.booking_channel_key
LEFT JOIN passengers p
ON t.passenger_key = p.passenger_key
LEFT JOIN stations os
ON t.origin_station_key = os.station_key
LEFT JOIN stations ds
ON t.destination_station_key = ds.station_key

{code}
 

 The query will generate exeuction plan of:

 
{code:java}
Flink SQL> explain
>  SELECT
>    t.actual_departure_date,
>    p.first_name,
>    p.last_name,
>    b.channel,
>    os.city AS origin_station,
>    ds.city AS destination_station
>  FROM train_activities_1 t
>  LEFT JOIN booking_channels b
>  ON t.booking_channel_key = b.booking_channel_key
>  LEFT JOIN passengers p
>  ON t.passenger_key = p.passenger_key
>  LEFT JOIN stations os
>  ON t.origin_station_key = os.station_key
>  LEFT JOIN stations ds
>  ON t.destination_station_key = ds.station_key;
== Abstract Syntax Tree ==
LogicalProject(actual_departure_date=[$1], first_name=[$10], last_name=[$11], 
channel=[$8], origin_station=[$15], destination_station=[$18])
+- LogicalJoin(condition=[=($4, $16)], joinType=[left])
   :- LogicalJoin(condition=[=($3, $13)], joinType=[left])
   :  :- LogicalJoin(condition=[=($2, $9)], joinType=[left])
   :  :  :- LogicalJoin(condition=[=($5, $6)], joinType=[left])
   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, 
train_activities_1]])
   :  :  :  +- LogicalWatermarkAssigner(rowtime=[update_time], watermark=[-($1, 
10000:INTERVAL SECOND)])
   :  :  :     +- LogicalTableScan(table=[[default_catalog, default_database, 
booking_channels]])
   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, 
passengers]])
   :  +- LogicalTableScan(table=[[default_catalog, default_database, stations]])
   +- LogicalTableScan(table=[[default_catalog, default_database, stations]])== 
Optimized Physical Plan ==
Calc(select=[actual_departure_date, first_name, last_name, channel, city AS 
origin_station, city0 AS destination_station])
+- Join(joinType=[LeftOuterJoin], where=[=(destination_station_key, 
station_key)], select=[actual_departure_date, destination_station_key, channel, 
first_name, last_name, city, station_key, city0], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
   :- Exchange(distribution=[hash[destination_station_key]])
   :  +- Calc(select=[actual_departure_date, destination_station_key, channel, 
first_name, last_name, city])
   :     +- Join(joinType=[LeftOuterJoin], where=[=(origin_station_key, 
station_key)], select=[actual_departure_date, origin_station_key, 
destination_station_key, channel, first_name, last_name, station_key, city], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
   :        :- Exchange(distribution=[hash[origin_station_key]])
   :        :  +- Calc(select=[actual_departure_date, origin_station_key, 
destination_station_key, channel, first_name, last_name])
   :        :     +- Join(joinType=[LeftOuterJoin], where=[=(passenger_key, 
passenger_key0)], select=[actual_departure_date, passenger_key, 
origin_station_key, destination_station_key, channel, passenger_key0, 
first_name, last_name], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
   :        :        :- Exchange(distribution=[hash[passenger_key]])
   :        :        :  +- Calc(select=[actual_departure_date, passenger_key, 
origin_station_key, destination_station_key, channel])
   :        :        :     +- Join(joinType=[LeftOuterJoin], 
where=[=(booking_channel_key, booking_channel_key0)], 
select=[actual_departure_date, passenger_key, origin_station_key, 
destination_station_key, booking_channel_key, booking_channel_key0, channel], 
leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
   :        :        :        :- 
Exchange(distribution=[hash[booking_channel_key]])
   :        :        :        :  +- Calc(select=[actual_departure_date, 
passenger_key, origin_station_key, destination_station_key, 
booking_channel_key])
   :        :        :        :     +- 
ChangelogNormalize(key=[booking_channel_key, origin_station_key, 
destination_station_key])
   :        :        :        :        +- 
Exchange(distribution=[hash[booking_channel_key, origin_station_key, 
destination_station_key]])
   :        :        :        :           +- 
TableSourceScan(table=[[default_catalog, default_database, 
train_activities_1]], fields=[scheduled_departure_time, actual_departure_date, 
passenger_key, origin_station_key, destination_station_key, 
booking_channel_key])
   :        :        :        +- 
Exchange(distribution=[hash[booking_channel_key]])
   :        :        :           +- Calc(select=[booking_channel_key, channel])
   :        :        :              +- 
ChangelogNormalize(key=[booking_channel_key])
   :        :        :                 +- 
Exchange(distribution=[hash[booking_channel_key]])
   :        :        :                    +- 
TableSourceScan(table=[[default_catalog, default_database, booking_channels, 
watermark=[-($1, 10000:INTERVAL SECOND)]]], fields=[booking_channel_key, 
update_time, channel])
   :        :        +- Exchange(distribution=[hash[passenger_key]])
   :        :           +- Calc(select=[passenger_key, first_name, last_name])
   :        :              +- ChangelogNormalize(key=[passenger_key])
   :        :                 +- Exchange(distribution=[hash[passenger_key]])
   :        :                    +- TableSourceScan(table=[[default_catalog, 
default_database, passengers]], fields=[passenger_key, first_name, last_name, 
update_time])
   :        +- Exchange(distribution=[hash[station_key]])
   :           +- Calc(select=[station_key, city])
   :              +- ChangelogNormalize(key=[station_key])
   :                 +- Exchange(distribution=[hash[station_key]])
   :                    +- TableSourceScan(table=[[default_catalog, 
default_database, stations]], fields=[station_key, update_time, city])
   +- Exchange(distribution=[hash[station_key]])
      +- Calc(select=[station_key, city])
         +- ChangelogNormalize(key=[station_key])
            +- Exchange(distribution=[hash[station_key]])
               +- TableSourceScan(table=[[default_catalog, default_database, 
stations]], fields=[station_key, update_time, city])== Optimized Execution Plan 
==
Calc(select=[actual_departure_date, first_name, last_name, channel, city AS 
origin_station, city0 AS destination_station])
+- Join(joinType=[LeftOuterJoin], where=[(destination_station_key = 
station_key)], select=[actual_departure_date, destination_station_key, channel, 
first_name, last_name, city, station_key, city0], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
   :- Exchange(distribution=[hash[destination_station_key]])
   :  +- Calc(select=[actual_departure_date, destination_station_key, channel, 
first_name, last_name, city])
   :     +- Join(joinType=[LeftOuterJoin], where=[(origin_station_key = 
station_key)], select=[actual_departure_date, origin_station_key, 
destination_station_key, channel, first_name, last_name, station_key, city], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
   :        :- Exchange(distribution=[hash[origin_station_key]])
   :        :  +- Calc(select=[actual_departure_date, origin_station_key, 
destination_station_key, channel, first_name, last_name])
   :        :     +- Join(joinType=[LeftOuterJoin], where=[(passenger_key = 
passenger_key0)], select=[actual_departure_date, passenger_key, 
origin_station_key, destination_station_key, channel, passenger_key0, 
first_name, last_name], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
   :        :        :- Exchange(distribution=[hash[passenger_key]])
   :        :        :  +- Calc(select=[actual_departure_date, passenger_key, 
origin_station_key, destination_station_key, channel])
   :        :        :     +- Join(joinType=[LeftOuterJoin], 
where=[(booking_channel_key = booking_channel_key0)], 
select=[actual_departure_date, passenger_key, origin_station_key, 
destination_station_key, booking_channel_key, booking_channel_key0, channel], 
leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
   :        :        :        :- 
Exchange(distribution=[hash[booking_channel_key]])
   :        :        :        :  +- Calc(select=[actual_departure_date, 
passenger_key, origin_station_key, destination_station_key, 
booking_channel_key])
   :        :        :        :     +- 
ChangelogNormalize(key=[booking_channel_key, origin_station_key, 
destination_station_key])
   :        :        :        :        +- 
Exchange(distribution=[hash[booking_channel_key, origin_station_key, 
destination_station_key]])
   :        :        :        :           +- 
TableSourceScan(table=[[default_catalog, default_database, 
train_activities_1]], fields=[scheduled_departure_time, actual_departure_date, 
passenger_key, origin_station_key, destination_station_key, 
booking_channel_key])
   :        :        :        +- 
Exchange(distribution=[hash[booking_channel_key]])
   :        :        :           +- Calc(select=[booking_channel_key, channel])
   :        :        :              +- 
ChangelogNormalize(key=[booking_channel_key])
   :        :        :                 +- 
Exchange(distribution=[hash[booking_channel_key]])
   :        :        :                    +- 
TableSourceScan(table=[[default_catalog, default_database, booking_channels, 
watermark=[-($1, 10000:INTERVAL SECOND)]]], fields=[booking_channel_key, 
update_time, channel])
   :        :        +- Exchange(distribution=[hash[passenger_key]])
   :        :           +- Calc(select=[passenger_key, first_name, last_name])
   :        :              +- ChangelogNormalize(key=[passenger_key])
   :        :                 +- Exchange(distribution=[hash[passenger_key]])
   :        :                    +- TableSourceScan(table=[[default_catalog, 
default_database, passengers]], fields=[passenger_key, first_name, last_name, 
update_time])
   :        +- Exchange(distribution=[hash[station_key]])(reuse_id=[1])
   :           +- Calc(select=[station_key, city])
   :              +- ChangelogNormalize(key=[station_key])
   :                 +- Exchange(distribution=[hash[station_key]])
   :                    +- TableSourceScan(table=[[default_catalog, 
default_database, stations]], fields=[station_key, update_time, city])
   +- Reused(reference_id=[1])
{code}
 


  


> UniqueKey constraint is lost with multiple sources join in SQL
> --------------------------------------------------------------
>
>                 Key: FLINK-22113
>                 URL: https://issues.apache.org/jira/browse/FLINK-22113
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.13.0
>            Reporter: Fu Kai
>            Priority: Major
>
> Hi team,
>   
>  We have a use case to join multiple data sources to generate a continuous 
> updated view. We defined primary key constraint on all the input sources and 
> all the keys are the subsets in the join condition. All joins are left join.
>   
>  In our case, the first two inputs can produce *JoinKeyContainsUniqueKey* 
> input sepc, which is good and performant. While when it comes to the third 
> input source, it's joined with the intermediate output table of the first two 
> input tables, and the intermediate table does not carry key constraint 
> information(although the thrid source input table does), so it results in a 
> *NoUniqueKey* input sepc. Given NoUniqueKey inputs has dramatic performance 
> implications per the[ Force Join Unique 
> Key|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Force-Join-Unique-Key-td39521.html#a39651]
>  email thread, we want to know if there is any mitigation solution for this.
>  
> Example:
> Take the example from 
> [https://github.com/ververica/flink-sql-cookbook/blob/master/joins/05/05_star_schema.md]
> {code:java}
> CREATE TEMPORARY TABLE passengers (
>   passenger_key STRING,
>   first_name STRING,
>   last_name STRING,
>   update_time TIMESTAMP(3),
>   PRIMARY KEY (passenger_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'passengers',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'raw',
>   'value.format' = 'json'
> );
> CREATE TEMPORARY TABLE stations (
>   station_key STRING,
>   update_time TIMESTAMP(3),
>   city STRING,
>   PRIMARY KEY (station_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'stations',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'raw',
>   'value.format' = 'json'
> );
> CREATE TEMPORARY TABLE booking_channels (
>   booking_channel_key STRING,
>   update_time TIMESTAMP(3),
>   channel STRING,
>   PRIMARY KEY (booking_channel_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'booking_channels',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'raw',
>   'value.format' = 'json'
> );
> CREATE TEMPORARY TABLE train_activities (
>   scheduled_departure_time TIMESTAMP(3),
>   actual_departure_date TIMESTAMP(3),
>   passenger_key STRING,
>   origin_station_key STRING,
>   destination_station_key STRING,
>   booking_channel_key STRING,
>   PRIMARY KEY (booking_channel_key, origin_station_key, 
> destination_station_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'train_activities',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'json',
>   'value.format' = 'json'
> );
> SELECT 
>   t.actual_departure_date, 
>   p.first_name,
>   p.last_name,
>   b.channel, 
>   os.city AS origin_station,
>   ds.city AS destination_station
> FROM train_activities_1 t
> LEFT JOIN booking_channels b 
> ON t.booking_channel_key = b.booking_channel_key
> LEFT JOIN passengers p
> ON t.passenger_key = p.passenger_key
> LEFT JOIN stations os
> ON t.origin_station_key = os.station_key
> LEFT JOIN stations ds
> ON t.destination_station_key = ds.station_key
> {code}
>  
>  The query will generate exeuction plan of:
>  
> {code:java}
> Flink SQL> explain
> >  SELECT
> >    t.actual_departure_date,
> >    p.first_name,
> >    p.last_name,
> >    b.channel,
> >    os.city AS origin_station,
> >    ds.city AS destination_station
> >  FROM train_activities_1 t
> >  LEFT JOIN booking_channels b
> >  ON t.booking_channel_key = b.booking_channel_key
> >  LEFT JOIN passengers p
> >  ON t.passenger_key = p.passenger_key
> >  LEFT JOIN stations os
> >  ON t.origin_station_key = os.station_key
> >  LEFT JOIN stations ds
> >  ON t.destination_station_key = ds.station_key;
> == Abstract Syntax Tree ==
> LogicalProject(actual_departure_date=[$1], first_name=[$10], last_name=[$11], 
> channel=[$8], origin_station=[$15], destination_station=[$18])
> +- LogicalJoin(condition=[=($4, $16)], joinType=[left])
>    :- LogicalJoin(condition=[=($3, $13)], joinType=[left])
>    :  :- LogicalJoin(condition=[=($2, $9)], joinType=[left])
>    :  :  :- LogicalJoin(condition=[=($5, $6)], joinType=[left])
>    :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, 
> train_activities_1]])
>    :  :  :  +- LogicalWatermarkAssigner(rowtime=[update_time], 
> watermark=[-($1, 10000:INTERVAL SECOND)])
>    :  :  :     +- LogicalTableScan(table=[[default_catalog, default_database, 
> booking_channels]])
>    :  :  +- LogicalTableScan(table=[[default_catalog, default_database, 
> passengers]])
>    :  +- LogicalTableScan(table=[[default_catalog, default_database, 
> stations]])
>    +- LogicalTableScan(table=[[default_catalog, default_database, 
> stations]])== Optimized Physical Plan ==
> Calc(select=[actual_departure_date, first_name, last_name, channel, city AS 
> origin_station, city0 AS destination_station])
> +- Join(joinType=[LeftOuterJoin], where=[=(destination_station_key, 
> station_key)], select=[actual_departure_date, destination_station_key, 
> channel, first_name, last_name, city, station_key, city0], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
>    :- Exchange(distribution=[hash[destination_station_key]])
>    :  +- Calc(select=[actual_departure_date, destination_station_key, 
> channel, first_name, last_name, city])
>    :     +- Join(joinType=[LeftOuterJoin], where=[=(origin_station_key, 
> station_key)], select=[actual_departure_date, origin_station_key, 
> destination_station_key, channel, first_name, last_name, station_key, city], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
>    :        :- Exchange(distribution=[hash[origin_station_key]])
>    :        :  +- Calc(select=[actual_departure_date, origin_station_key, 
> destination_station_key, channel, first_name, last_name])
>    :        :     +- Join(joinType=[LeftOuterJoin], where=[=(passenger_key, 
> passenger_key0)], select=[actual_departure_date, passenger_key, 
> origin_station_key, destination_station_key, channel, passenger_key0, 
> first_name, last_name], leftInputSpec=[NoUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey])
>    :        :        :- Exchange(distribution=[hash[passenger_key]])
>    :        :        :  +- Calc(select=[actual_departure_date, passenger_key, 
> origin_station_key, destination_station_key, channel])
>    :        :        :     +- Join(joinType=[LeftOuterJoin], 
> where=[=(booking_channel_key, booking_channel_key0)], 
> select=[actual_departure_date, passenger_key, origin_station_key, 
> destination_station_key, booking_channel_key, booking_channel_key0, channel], 
> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
>    :        :        :        :- 
> Exchange(distribution=[hash[booking_channel_key]])
>    :        :        :        :  +- Calc(select=[actual_departure_date, 
> passenger_key, origin_station_key, destination_station_key, 
> booking_channel_key])
>    :        :        :        :     +- 
> ChangelogNormalize(key=[booking_channel_key, origin_station_key, 
> destination_station_key])
>    :        :        :        :        +- 
> Exchange(distribution=[hash[booking_channel_key, origin_station_key, 
> destination_station_key]])
>    :        :        :        :           +- 
> TableSourceScan(table=[[default_catalog, default_database, 
> train_activities_1]], fields=[scheduled_departure_time, 
> actual_departure_date, passenger_key, origin_station_key, 
> destination_station_key, booking_channel_key])
>    :        :        :        +- 
> Exchange(distribution=[hash[booking_channel_key]])
>    :        :        :           +- Calc(select=[booking_channel_key, 
> channel])
>    :        :        :              +- 
> ChangelogNormalize(key=[booking_channel_key])
>    :        :        :                 +- 
> Exchange(distribution=[hash[booking_channel_key]])
>    :        :        :                    +- 
> TableSourceScan(table=[[default_catalog, default_database, booking_channels, 
> watermark=[-($1, 10000:INTERVAL SECOND)]]], fields=[booking_channel_key, 
> update_time, channel])
>    :        :        +- Exchange(distribution=[hash[passenger_key]])
>    :        :           +- Calc(select=[passenger_key, first_name, last_name])
>    :        :              +- ChangelogNormalize(key=[passenger_key])
>    :        :                 +- Exchange(distribution=[hash[passenger_key]])
>    :        :                    +- TableSourceScan(table=[[default_catalog, 
> default_database, passengers]], fields=[passenger_key, first_name, last_name, 
> update_time])
>    :        +- Exchange(distribution=[hash[station_key]])
>    :           +- Calc(select=[station_key, city])
>    :              +- ChangelogNormalize(key=[station_key])
>    :                 +- Exchange(distribution=[hash[station_key]])
>    :                    +- TableSourceScan(table=[[default_catalog, 
> default_database, stations]], fields=[station_key, update_time, city])
>    +- Exchange(distribution=[hash[station_key]])
>       +- Calc(select=[station_key, city])
>          +- ChangelogNormalize(key=[station_key])
>             +- Exchange(distribution=[hash[station_key]])
>                +- TableSourceScan(table=[[default_catalog, default_database, 
> stations]], fields=[station_key, update_time, city])== Optimized Execution 
> Plan ==
> Calc(select=[actual_departure_date, first_name, last_name, channel, city AS 
> origin_station, city0 AS destination_station])
> +- Join(joinType=[LeftOuterJoin], where=[(destination_station_key = 
> station_key)], select=[actual_departure_date, destination_station_key, 
> channel, first_name, last_name, city, station_key, city0], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
>    :- Exchange(distribution=[hash[destination_station_key]])
>    :  +- Calc(select=[actual_departure_date, destination_station_key, 
> channel, first_name, last_name, city])
>    :     +- Join(joinType=[LeftOuterJoin], where=[(origin_station_key = 
> station_key)], select=[actual_departure_date, origin_station_key, 
> destination_station_key, channel, first_name, last_name, station_key, city], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
>    :        :- Exchange(distribution=[hash[origin_station_key]])
>    :        :  +- Calc(select=[actual_departure_date, origin_station_key, 
> destination_station_key, channel, first_name, last_name])
>    :        :     +- Join(joinType=[LeftOuterJoin], where=[(passenger_key = 
> passenger_key0)], select=[actual_departure_date, passenger_key, 
> origin_station_key, destination_station_key, channel, passenger_key0, 
> first_name, last_name], leftInputSpec=[NoUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey])
>    :        :        :- Exchange(distribution=[hash[passenger_key]])
>    :        :        :  +- Calc(select=[actual_departure_date, passenger_key, 
> origin_station_key, destination_station_key, channel])
>    :        :        :     +- Join(joinType=[LeftOuterJoin], 
> where=[(booking_channel_key = booking_channel_key0)], 
> select=[actual_departure_date, passenger_key, origin_station_key, 
> destination_station_key, booking_channel_key, booking_channel_key0, channel], 
> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
>    :        :        :        :- 
> Exchange(distribution=[hash[booking_channel_key]])
>    :        :        :        :  +- Calc(select=[actual_departure_date, 
> passenger_key, origin_station_key, destination_station_key, 
> booking_channel_key])
>    :        :        :        :     +- 
> ChangelogNormalize(key=[booking_channel_key, origin_station_key, 
> destination_station_key])
>    :        :        :        :        +- 
> Exchange(distribution=[hash[booking_channel_key, origin_station_key, 
> destination_station_key]])
>    :        :        :        :           +- 
> TableSourceScan(table=[[default_catalog, default_database, 
> train_activities_1]], fields=[scheduled_departure_time, 
> actual_departure_date, passenger_key, origin_station_key, 
> destination_station_key, booking_channel_key])
>    :        :        :        +- 
> Exchange(distribution=[hash[booking_channel_key]])
>    :        :        :           +- Calc(select=[booking_channel_key, 
> channel])
>    :        :        :              +- 
> ChangelogNormalize(key=[booking_channel_key])
>    :        :        :                 +- 
> Exchange(distribution=[hash[booking_channel_key]])
>    :        :        :                    +- 
> TableSourceScan(table=[[default_catalog, default_database, booking_channels, 
> watermark=[-($1, 10000:INTERVAL SECOND)]]], fields=[booking_channel_key, 
> update_time, channel])
>    :        :        +- Exchange(distribution=[hash[passenger_key]])
>    :        :           +- Calc(select=[passenger_key, first_name, last_name])
>    :        :              +- ChangelogNormalize(key=[passenger_key])
>    :        :                 +- Exchange(distribution=[hash[passenger_key]])
>    :        :                    +- TableSourceScan(table=[[default_catalog, 
> default_database, passengers]], fields=[passenger_key, first_name, last_name, 
> update_time])
>    :        +- Exchange(distribution=[hash[station_key]])(reuse_id=[1])
>    :           +- Calc(select=[station_key, city])
>    :              +- ChangelogNormalize(key=[station_key])
>    :                 +- Exchange(distribution=[hash[station_key]])
>    :                    +- TableSourceScan(table=[[default_catalog, 
> default_database, stations]], fields=[station_key, update_time, city])
>    +- Reused(reference_id=[1])
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to