[jira] [Updated] (FLINK-18988) Continuous query with LATERAL and LIMIT produces wrong result

2020-08-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-18988:
---
Labels: pull-request-available  (was: )

> Continuous query with LATERAL and LIMIT produces wrong result
> -
>
> Key: FLINK-18988
> URL: https://issues.apache.org/jira/browse/FLINK-18988
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.1
>Reporter: Fabian Hueske
>Assignee: Danny Chen
>Priority: Critical
>  Labels: pull-request-available
>
> I was trying out the example queries provided in this blog post: 
> [https://materialize.io/lateral-joins-and-demand-driven-queries/] to check if 
> Flink supports the same and found that the queries were translated and 
> executed but produced the wrong result.
> I used the SQL Client and Kafka (running at kafka:9092) to store the table 
> data. I executed the following statements:
> {code:java}
> -- create cities table
> CREATE TABLE cities (
>   name STRING NOT NULL,
>   state STRING NOT NULL,
>   pop INT NOT NULL
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'cities',
>   'properties.bootstrap.servers' = 'kafka:9092',
>   'properties.group.id' = 'mygroup', 
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'json'
> );
> -- fill cities table
> INSERT INTO cities VALUES
>   ('Los_Angeles', 'CA', 3979576),
>   ('Phoenix', 'AZ', 1680992),
>   ('Houston', 'TX', 2320268),
>   ('San_Diego', 'CA', 1423851),
>   ('San_Francisco', 'CA', 881549),
>   ('New_York', 'NY', 8336817),
>   ('Dallas', 'TX', 1343573),
>   ('San_Antonio', 'TX', 1547253),
>   ('San_Jose', 'CA', 1021795),
>   ('Chicago', 'IL', 2695598),
>   ('Austin', 'TX', 978908);
> -- execute query
> SELECT state, name 
> FROM
>   (SELECT DISTINCT state FROM cities) states,
>   LATERAL (
> SELECT name, pop
> FROM cities
> WHERE state = states.state
> ORDER BY pop
> DESC LIMIT 3
>   );
> -- result
> state  name
>CA   Los_Angeles
>NY  New_York
>IL   Chicago
> -- expected result
> state | name
> --+-
> TX    | Dallas
> AZ    | Phoenix
> IL    | Chicago
> TX    | Houston
> CA    | San_Jose
> NY    | New_York
> CA    | San_Diego
> CA    | Los_Angeles
> TX    | San_Antonio
> {code}
> As you can see from the query result, Flink computes the top3 cities over all 
> states, not for every state individually. Hence, I assume that this is a bug 
> in the query optimizer or one of the rewriting rules.
> There are two valid ways to solve this issue:
>  * Fixing the rewriting rules / optimizer (obviously preferred)
>  * Disabling this feature and throwing an exception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18988) Continuous query with LATERAL and LIMIT produces wrong result

2020-08-18 Thread Fabian Hueske (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-18988:
--
Description: 
I was trying out the example queries provided in this blog post: 
[https://materialize.io/lateral-joins-and-demand-driven-queries/] to check if 
Flink supports the same and found that the queries were translated and executed 
but produced the wrong result.

I used the SQL Client and Kafka (running at kafka:9092) to store the table 
data. I executed the following statements:
{code:java}
-- create cities table
CREATE TABLE cities (
  name STRING NOT NULL,
  state STRING NOT NULL,
  pop INT NOT NULL
) WITH (
  'connector' = 'kafka',
  'topic' = 'cities',
  'properties.bootstrap.servers' = 'kafka:9092',
  'properties.group.id' = 'mygroup', 
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- fill cities table
INSERT INTO cities VALUES
  ('Los_Angeles', 'CA', 3979576),
  ('Phoenix', 'AZ', 1680992),
  ('Houston', 'TX', 2320268),
  ('San_Diego', 'CA', 1423851),
  ('San_Francisco', 'CA', 881549),
  ('New_York', 'NY', 8336817),
  ('Dallas', 'TX', 1343573),
  ('San_Antonio', 'TX', 1547253),
  ('San_Jose', 'CA', 1021795),
  ('Chicago', 'IL', 2695598),
  ('Austin', 'TX', 978908);

-- execute query
SELECT state, name 
FROM
  (SELECT DISTINCT state FROM cities) states,
  LATERAL (
SELECT name, pop
FROM cities
WHERE state = states.state
ORDER BY pop
DESC LIMIT 3
  );

-- result
state  name
   CA   Los_Angeles
   NY  New_York
   IL   Chicago

-- expected result
state | name
--+-
TX    | Dallas
AZ    | Phoenix
IL    | Chicago
TX    | Houston
CA    | San_Jose
NY    | New_York
CA    | San_Diego
CA    | Los_Angeles
TX    | San_Antonio

{code}
As you can see from the query result, Flink computes the top3 cities over all 
states, not for every state individually. Hence, I assume that this is a bug in 
the query optimizer or one of the rewriting rules.

There are two valid ways to solve this issue:
 * Fixing the rewriting rules / optimizer (obviously preferred)
 * Disabling this feature and throwing an exception

  was:
I was trying out the example queries provided in this blog post: 
[https://materialize.io/lateral-joins-and-demand-driven-queries/] to check if 
Flink supports the same and found that the queries were translated and executed 
but produced the wrong result.

I used the SQL Client and Kafka (running at kafka:9092) to store the table 
data. I executed the following statements:
{code:java}
-- create cities table
CREATE TABLE cities (
  name STRING NOT NULL,
  state STRING NOT NULL,
  pop INT NOT NULL
) WITH (
  'connector' = 'kafka',
  'topic' = 'cities',
  'properties.bootstrap.servers' = 'kafka:9092',
  'properties.group.id' = 'mygroup', 
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- fill cities table
INSERT INTO cities VALUES
  ('Los_Angeles', 'CA', 3979576),
  ('Phoenix', 'AZ', 1680992),
  ('Houston', 'TX', 2320268),
  ('San_Diego', 'CA', 1423851),
  ('San_Francisco', 'CA', 881549),
  ('New_York', 'NY', 8336817),
  ('Dallas', 'TX', 1343573),
  ('San_Antonio', 'TX', 1547253),
  ('San_Jose', 'CA', 1021795),
  ('Chicago', 'IL', 2695598),
  ('Austin', 'TX', 978908);

-- execute query
SELECT state, name 
FROM
  (SELECT DISTINCT state FROM cities) states,
  LATERAL (
SELECT name, pop
FROM cities
WHERE state = states.state
ORDER BY pop
DESC LIMIT 3
  );

-- result
state  name
   CA   Los_Angeles
   NY  New_York
   IL   Chicago

-- expected result
state | name
--+-
TX    | Dallas
AZ    | Phoenix
IL    | Chicago
TX    | Houston
CA    | San_Jose
NY    | New_York
CA    | San_Diego
CA    | Los_Angeles
TX    | San_Antonio

{code}
As you can see from the query result, Flink computes the top3 cities over all 
states, not for every state individually. Hence, I assume that this is a bug in 
the query optimizer or one of the rewriting rules.


> Continuous query with LATERAL and LIMIT produces wrong result
> -
>
> Key: FLINK-18988
> URL: https://issues.apache.org/jira/browse/FLINK-18988
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.1
>Reporter: Fabian Hueske
>Priority: Critical
>
> I was trying out the example queries provided in this blog post: 
> [https://materialize.io/lateral-joins-and-demand-driven-queries/] to check if 
> Flink supports the same and found that the queries were translated and 
> executed but produced the wrong result.
> I used the SQL Client and Kafka (running at kafka:9092) to store the table 
> data. I executed the following statements:
> {code:java}
> -- create