[ 
https://issues.apache.org/jira/browse/FLINK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani updated FLINK-26549:
----------------------------------------
    Description: 
While working on casting, I've found out we have an interesting bug in the 
insert values type inference. This comes from the 
{{KafkaTableITCase#testKafkaSourceSinkWithMetadata}} (look at this version in 
particular 
https://github.com/apache/flink/blob/567440115bcacb5aceaf3304e486281c7da8c14f/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java).

The test scenario is an INSERT INTO VALUES query which is also pushing some 
metadata to a Kafka table, in particular is writing the headers metadata.

The table is declared like that:

{code:sql}
 CREATE TABLE kafka (
  `physical_1` STRING,
  `physical_2` INT,
  `timestamp-type` STRING METADATA VIRTUAL,
  `timestamp` TIMESTAMP(3) METADATA,
  `leader-epoch` INT METADATA VIRTUAL,
  `headers` MAP<STRING, BYTES> METADATA,
  `partition` INT METADATA VIRTUAL,
  `topic` STRING METADATA VIRTUAL,
  `physical_3` BOOLEAN
) WITH (
   'connector' = 'kafka',
   [...]
)
{code}

The insert into query looks like:

{code:sql}
INSERT INTO kafka VALUES
('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', x'C0FFEE', 'k2', 
x'BABE'], TRUE),
('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP<STRING, 
BYTES>), FALSE),
('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2', 
X'20'], TRUE)
{code}

Note that in the first row, the byte literal is of length 3, while in the last 
row the byte literal is of length 1.

The generated plan of this INSERT INTO is:

{code}
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], 
headers=[CAST($3):(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP], 
timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
   +- LogicalUnion(all=[true])
      :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], 
EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
      :  +- LogicalValues(tuples=[[{ 0 }]])
      :- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], 
EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], 
EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP], EXPR$4=[false])
      :  +- LogicalValues(tuples=[[{ 0 }]])
      +- LogicalProject(EXPR$0=[_UTF-16LE'data 3'], EXPR$1=[3], 
EXPR$2=[2020-03-10 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1))], EXPR$4=[true])
         +- LogicalValues(tuples=[[{ 0 }]])

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
timestamp])
   :- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS 
physical_3, CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', 
X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT NULL, BINARY(1) 
NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
   :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
   :- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS 
physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
   :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
   +- Calc(select=[_UTF-16LE'data 3' AS physical_1, 3 AS physical_2, true AS 
physical_3, CAST(MAP(_UTF-16LE'k1', X'10':BINARY(1), _UTF-16LE'k2', 
X'20':BINARY(1)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-10 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
      +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
timestamp])
   :- Calc(select=['data 1' AS physical_1, 1 AS physical_2, true AS physical_3, 
CAST(CAST(MAP('k1', X'c0ffee', 'k2', X'babe') AS (CHAR(2), BINARY(1)) MAP) AS 
(VARCHAR(2147483647), VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 
12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
   :  +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
   :- Calc(select=['data 2' AS physical_1, 2 AS physical_2, false AS 
physical_3, null:(VARCHAR(2147483647), VARBINARY(2147483647)) MAP AS headers, 
CAST(2020-03-09 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS 
timestamp])
   :  +- Reused(reference_id=[1])
   +- Calc(select=['data 3' AS physical_1, 3 AS physical_2, true AS physical_3, 
CAST(MAP('k1', X'10', 'k2', X'20') AS (VARCHAR(2147483647), 
VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-10 12:12:11.123 AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
      +- Reused(reference_id=[1])
{code}

As you see, in the _Abstract Syntax Tree_ section a casting for the headers is 
injected (although unnecessary, as it should be an identity cast), but then in 
_Optimized Physical Plan_ another casting is injected:

{code}
CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', 
X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT NULL, BINARY(1) 
NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP) AS headers
{code}

Which makes no sense, as it's casting the values of the map first to 
{{BINARY(1)}} and then to {{BYTES}}, causing to trim the last 2 bytes. Removing 
the last row to insert makes the VALUES type inference work properly:

{code}
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], 
headers=[$3], timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
   +- LogicalUnion(all=[true])
      :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], 
EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
      :  +- LogicalValues(tuples=[[{ 0 }]])
      +- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], 
EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], 
EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP], EXPR$4=[false])
         +- LogicalValues(tuples=[[{ 0 }]])

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
timestamp])
   :- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS 
physical_3, CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', 
X'babe':VARBINARY(3)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
   :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
   +- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS 
physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
      +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
timestamp])
   :- Calc(select=['data 1' AS physical_1, 1 AS physical_2, true AS physical_3, 
CAST(MAP('k1', X'c0ffee', 'k2', X'babe') AS (VARCHAR(2147483647), 
VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 12:12:11.123 AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
   :  +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
   +- Calc(select=['data 2' AS physical_1, 2 AS physical_2, false AS 
physical_3, null:(VARCHAR(2147483647), VARBINARY(2147483647)) MAP AS headers, 
CAST(2020-03-09 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS 
timestamp])
      +- Reused(reference_id=[1])
{code}  

  was:
While working on casting, I've found out we have an interesting bug in the 
insert values type inference. This comes from the 
{{KafkaTableITCase#testKafkaSourceSinkWithMetadata}} (look at this version in 
particular 
https://github.com/apache/flink/blob/567440115bcacb5aceaf3304e486281c7da8c14f/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java).

The test scenario is an INSERT INTO VALUES query which is also pushing some 
metadata to a Kafka table, in particular is writing the headers metadata.

The table is declared like that:

{code:sql}
 CREATE TABLE kafka (
  `physical_1` STRING,
  `physical_2` INT,
  `timestamp-type` STRING METADATA VIRTUAL,
  `timestamp` TIMESTAMP(3) METADATA,
  `leader-epoch` INT METADATA VIRTUAL,
  `headers` MAP<STRING, BYTES> METADATA,
  `partition` INT METADATA VIRTUAL,
  `topic` STRING METADATA VIRTUAL,
  `physical_3` BOOLEAN
) WITH (
   'connector' = 'kafka',
   [...]
)
{code}

The insert into query looks like:

{code:sql}
INSERT INTO kafka VALUES
('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', x'C0FFEE', 'k2', 
x'BABE'], TRUE),
('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP<STRING, 
BYTES>), FALSE),
('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2', 
X'20'], TRUE)
{code}

Note that in the first row, the byte literal is of length 3, while in the last 
row the byte literal is of length 1.

The generated plan of this INSERT INTO is:

{code}
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], 
headers=[CAST($3):(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP], 
timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
   +- LogicalUnion(all=[true])
      :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], 
EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
      :  +- LogicalValues(tuples=[[{ 0 }]])
      :- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], 
EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], 
EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP], EXPR$4=[false])
      :  +- LogicalValues(tuples=[[{ 0 }]])
      +- LogicalProject(EXPR$0=[_UTF-16LE'data 3'], EXPR$1=[3], 
EXPR$2=[2020-03-10 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1))], EXPR$4=[true])
         +- LogicalValues(tuples=[[{ 0 }]])

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
timestamp])
   :- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS 
physical_3, CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', 
X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT NULL, BINARY(1) 
NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
   :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
   :- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS 
physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
   :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
   +- Calc(select=[_UTF-16LE'data 3' AS physical_1, 3 AS physical_2, true AS 
physical_3, CAST(MAP(_UTF-16LE'k1', X'10':BINARY(1), _UTF-16LE'k2', 
X'20':BINARY(1)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-10 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
      +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
timestamp])
   :- Calc(select=['data 1' AS physical_1, 1 AS physical_2, true AS physical_3, 
CAST(CAST(MAP('k1', X'c0ffee', 'k2', X'babe') AS (CHAR(2), BINARY(1)) MAP) AS 
(VARCHAR(2147483647), VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 
12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
   :  +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
   :- Calc(select=['data 2' AS physical_1, 2 AS physical_2, false AS 
physical_3, null:(VARCHAR(2147483647), VARBINARY(2147483647)) MAP AS headers, 
CAST(2020-03-09 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS 
timestamp])
   :  +- Reused(reference_id=[1])
   +- Calc(select=['data 3' AS physical_1, 3 AS physical_2, true AS physical_3, 
CAST(MAP('k1', X'10', 'k2', X'20') AS (VARCHAR(2147483647), 
VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-10 12:12:11.123 AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
      +- Reused(reference_id=[1])
{code}

As you see, in the _Abstract Syntax Tree_ section a casting for the headers is 
injected (although unnecessary, as it should be an identity cast), but then in 
_Optimized Physical Plan_ another casting is injected:

{code}
CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', 
X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT NULL, BINARY(1) 
NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP) AS headers
{code}

Which makes no sense, as it's casting the values of the map first to 
{{BINARY(1)}} and then to {{BYTES}}, causing to trigger the last 2 bytes. 
Removing the last row to insert makes the VALUES type inference work properly:

{code}
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], 
headers=[$3], timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
   +- LogicalUnion(all=[true])
      :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], 
EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
      :  +- LogicalValues(tuples=[[{ 0 }]])
      +- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], 
EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], 
EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP], EXPR$4=[false])
         +- LogicalValues(tuples=[[{ 0 }]])

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
timestamp])
   :- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS 
physical_3, CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', 
X'babe':VARBINARY(3)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
   :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
   +- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS 
physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
      +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
timestamp])
   :- Calc(select=['data 1' AS physical_1, 1 AS physical_2, true AS physical_3, 
CAST(MAP('k1', X'c0ffee', 'k2', X'babe') AS (VARCHAR(2147483647), 
VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 12:12:11.123 AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
   :  +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
   +- Calc(select=['data 2' AS physical_1, 2 AS physical_2, false AS 
physical_3, null:(VARCHAR(2147483647), VARBINARY(2147483647)) MAP AS headers, 
CAST(2020-03-09 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS 
timestamp])
      +- Reused(reference_id=[1])
{code}  


> INSERT INTO with VALUES leads to wrong type inference with nested types
> -----------------------------------------------------------------------
>
>                 Key: FLINK-26549
>                 URL: https://issues.apache.org/jira/browse/FLINK-26549
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Francesco Guardiani
>            Priority: Major
>
> While working on casting, I've found out we have an interesting bug in the 
> insert values type inference. This comes from the 
> {{KafkaTableITCase#testKafkaSourceSinkWithMetadata}} (look at this version in 
> particular 
> https://github.com/apache/flink/blob/567440115bcacb5aceaf3304e486281c7da8c14f/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java).
> The test scenario is an INSERT INTO VALUES query which is also pushing some 
> metadata to a Kafka table, in particular is writing the headers metadata.
> The table is declared like that:
> {code:sql}
>  CREATE TABLE kafka (
>   `physical_1` STRING,
>   `physical_2` INT,
>   `timestamp-type` STRING METADATA VIRTUAL,
>   `timestamp` TIMESTAMP(3) METADATA,
>   `leader-epoch` INT METADATA VIRTUAL,
>   `headers` MAP<STRING, BYTES> METADATA,
>   `partition` INT METADATA VIRTUAL,
>   `topic` STRING METADATA VIRTUAL,
>   `physical_3` BOOLEAN
> ) WITH (
>    'connector' = 'kafka',
>    [...]
> )
> {code}
> The insert into query looks like:
> {code:sql}
> INSERT INTO kafka VALUES
> ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', x'C0FFEE', 'k2', 
> x'BABE'], TRUE),
> ('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP<STRING, 
> BYTES>), FALSE),
> ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2', 
> X'20'], TRUE)
> {code}
> Note that in the first row, the byte literal is of length 3, while in the 
> last row the byte literal is of length 1.
> The generated plan of this INSERT INTO is:
> {code}
> == Abstract Syntax Tree ==
> LogicalSink(table=[default_catalog.default_database.kafka], 
> fields=[physical_1, physical_2, physical_3, headers, timestamp])
> +- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], 
> headers=[CAST($3):(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP], 
> timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
>    +- LogicalUnion(all=[true])
>       :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], 
> EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
> X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
>       :  +- LogicalValues(tuples=[[{ 0 }]])
>       :- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], 
> EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], 
> EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP], EXPR$4=[false])
>       :  +- LogicalValues(tuples=[[{ 0 }]])
>       +- LogicalProject(EXPR$0=[_UTF-16LE'data 3'], EXPR$1=[3], 
> EXPR$2=[2020-03-10 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
> X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1))], EXPR$4=[true])
>          +- LogicalValues(tuples=[[{ 0 }]])
> == Optimized Physical Plan ==
> Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
> physical_2, physical_3, headers, timestamp])
> +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
> timestamp])
>    :- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS 
> physical_3, CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), 
> _UTF-16LE'k2', X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT 
> NULL, BINARY(1) NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE", VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 
> 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>    :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
>    :- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS 
> physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 
> 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>    :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
>    +- Calc(select=[_UTF-16LE'data 3' AS physical_1, 3 AS physical_2, true AS 
> physical_3, CAST(MAP(_UTF-16LE'k1', X'10':BINARY(1), _UTF-16LE'k2', 
> X'20':BINARY(1)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-10 
> 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>       +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
> physical_2, physical_3, headers, timestamp])
> +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
> timestamp])
>    :- Calc(select=['data 1' AS physical_1, 1 AS physical_2, true AS 
> physical_3, CAST(CAST(MAP('k1', X'c0ffee', 'k2', X'babe') AS (CHAR(2), 
> BINARY(1)) MAP) AS (VARCHAR(2147483647), VARBINARY(2147483647)) MAP) AS 
> headers, CAST(2020-03-08 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) 
> AS timestamp])
>    :  +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
>    :- Calc(select=['data 2' AS physical_1, 2 AS physical_2, false AS 
> physical_3, null:(VARCHAR(2147483647), VARBINARY(2147483647)) MAP AS headers, 
> CAST(2020-03-09 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS 
> timestamp])
>    :  +- Reused(reference_id=[1])
>    +- Calc(select=['data 3' AS physical_1, 3 AS physical_2, true AS 
> physical_3, CAST(MAP('k1', X'10', 'k2', X'20') AS (VARCHAR(2147483647), 
> VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-10 12:12:11.123 AS 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>       +- Reused(reference_id=[1])
> {code}
> As you see, in the _Abstract Syntax Tree_ section a casting for the headers 
> is injected (although unnecessary, as it should be an identity cast), but 
> then in _Optimized Physical Plan_ another casting is injected:
> {code}
> CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', 
> X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT NULL, 
> BINARY(1) NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP) AS headers
> {code}
> Which makes no sense, as it's casting the values of the map first to 
> {{BINARY(1)}} and then to {{BYTES}}, causing to trim the last 2 bytes. 
> Removing the last row to insert makes the VALUES type inference work properly:
> {code}
> == Abstract Syntax Tree ==
> LogicalSink(table=[default_catalog.default_database.kafka], 
> fields=[physical_1, physical_2, physical_3, headers, timestamp])
> +- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], 
> headers=[$3], timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
>    +- LogicalUnion(all=[true])
>       :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], 
> EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
> X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
>       :  +- LogicalValues(tuples=[[{ 0 }]])
>       +- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], 
> EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], 
> EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP], EXPR$4=[false])
>          +- LogicalValues(tuples=[[{ 0 }]])
> == Optimized Physical Plan ==
> Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
> physical_2, physical_3, headers, timestamp])
> +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
> timestamp])
>    :- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS 
> physical_3, CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', 
> X'babe':VARBINARY(3)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 
> 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>    :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
>    +- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS 
> physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 
> 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>       +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
> physical_2, physical_3, headers, timestamp])
> +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
> timestamp])
>    :- Calc(select=['data 1' AS physical_1, 1 AS physical_2, true AS 
> physical_3, CAST(MAP('k1', X'c0ffee', 'k2', X'babe') AS (VARCHAR(2147483647), 
> VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 12:12:11.123 AS 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>    :  +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
>    +- Calc(select=['data 2' AS physical_1, 2 AS physical_2, false AS 
> physical_3, null:(VARCHAR(2147483647), VARBINARY(2147483647)) MAP AS headers, 
> CAST(2020-03-09 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS 
> timestamp])
>       +- Reused(reference_id=[1])
> {code}  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to