[jira] [Commented] (FLINK-26782) Remove PlannerExpression and related

2022-04-13 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17522095#comment-17522095
 ] 

Francesco Guardiani commented on FLINK-26782:
-

I wasn't able to complete this as it needs as prerequisite a solution for the 
legacy UDFs, because those still rely on PlannerExpression. Either we remove 
the legacy UDFs, or we need to adapt them to the new type inference stack.

This is the commit to perform the removal 
https://github.com/apache/flink/pull/19460/commits/fc7865ccd9ea09f5e3a2181af7b05f837f54caa9

> Remove PlannerExpression and related
> 
>
> Key: FLINK-26782
> URL: https://issues.apache.org/jira/browse/FLINK-26782
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-26782) Remove PlannerExpression and related

2022-04-13 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-26782:
---

Assignee: (was: Francesco Guardiani)

> Remove PlannerExpression and related
> 
>
> Key: FLINK-26782
> URL: https://issues.apache.org/jira/browse/FLINK-26782
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25809) Introduce test infra for building FLIP-190 tests

2022-04-13 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17521664#comment-17521664
 ] 

Francesco Guardiani commented on FLINK-25809:
-

I won't have time to work on this anymore unfortunately. I had a couple of 
ideas in mind, so here is my WIP for who wants to take this over: 
[https://github.com/slinkydeveloper/flink/tree/upgrade-tests] . Note that the 
branch is quite old, and in the meantime we have reworked a couple of things, 
starting from the MiniClusterExtension, so this definitely needs a lot of work.

> Introduce test infra for building FLIP-190 tests 
> -
>
> Key: FLINK-25809
> URL: https://issues.apache.org/jira/browse/FLINK-25809
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Francesco Guardiani
>Assignee: Francesco Guardiani
>Priority: Major
>
> The FLIP-190 requires to build a new test infra. For this test infra, we want 
> to define test cases and data once, and then for each case we want to execute 
> the following:
> * Integration test that roughly does {{create plan -> execute job -> trigger 
> savepoint -> stop job -> restore plan -> restore savepoint -> execute job -> 
> stop and assert}}. Plan and savepoint should be commited to git, so running 
> this tests when a plan and savepoint is available will not regenerate plan 
> and savepoint.
> * Change detection test to check that for the defined test cases, the plan 
> hasn't been changed. Similar to the existing {{JsonPlanITCase}} tests.
> * Completeness of tests/Coverage, that is count how many times ExecNodes 
> (including versions) are used in the test cases. Fail if an ExecNode version 
> is never covered.
> Other requirements includes to "version" the test cases, that is for each 
> test case we can retain different versions of the plan and savepoint, in 
> order to make sure that after we introduce a new plan change, the old plan 
> still continues to run



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-25809) Introduce test infra for building FLIP-190 tests

2022-04-13 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-25809:
---

Assignee: (was: Francesco Guardiani)

> Introduce test infra for building FLIP-190 tests 
> -
>
> Key: FLINK-25809
> URL: https://issues.apache.org/jira/browse/FLINK-25809
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Francesco Guardiani
>Priority: Major
>
> The FLIP-190 requires to build a new test infra. For this test infra, we want 
> to define test cases and data once, and then for each case we want to execute 
> the following:
> * Integration test that roughly does {{create plan -> execute job -> trigger 
> savepoint -> stop job -> restore plan -> restore savepoint -> execute job -> 
> stop and assert}}. Plan and savepoint should be commited to git, so running 
> this tests when a plan and savepoint is available will not regenerate plan 
> and savepoint.
> * Change detection test to check that for the defined test cases, the plan 
> hasn't been changed. Similar to the existing {{JsonPlanITCase}} tests.
> * Completeness of tests/Coverage, that is count how many times ExecNodes 
> (including versions) are used in the test cases. Fail if an ExecNode version 
> is never covered.
> Other requirements includes to "version" the test cases, that is for each 
> test case we can retain different versions of the plan and savepoint, in 
> order to make sure that after we introduce a new plan change, the old plan 
> still continues to run



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-26549) INSERT INTO with VALUES leads to wrong type inference with nested types

2022-04-13 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-26549:
---

Assignee: (was: Francesco Guardiani)

> INSERT INTO with VALUES leads to wrong type inference with nested types
> ---
>
> Key: FLINK-26549
> URL: https://issues.apache.org/jira/browse/FLINK-26549
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Francesco Guardiani
>Priority: Major
> Attachments: FlinkTypeFactoryTest.patch, InsertIntoValuesTest.patch, 
> Least_restrictive_issue.patch
>
>
> While working on casting, I've found out we have an interesting bug in the 
> insert values type inference. This comes from the 
> {{KafkaTableITCase#testKafkaSourceSinkWithMetadata}} (look at this version in 
> particular 
> https://github.com/apache/flink/blob/567440115bcacb5aceaf3304e486281c7da8c14f/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java).
> The test scenario is an INSERT INTO VALUES query which is also pushing some 
> metadata to a Kafka table, in particular is writing the headers metadata.
> The table is declared like that:
> {code:sql}
>  CREATE TABLE kafka (
>   `physical_1` STRING,
>   `physical_2` INT,
>   `timestamp-type` STRING METADATA VIRTUAL,
>   `timestamp` TIMESTAMP(3) METADATA,
>   `leader-epoch` INT METADATA VIRTUAL,
>   `headers` MAP METADATA,
>   `partition` INT METADATA VIRTUAL,
>   `topic` STRING METADATA VIRTUAL,
>   `physical_3` BOOLEAN
> ) WITH (
>'connector' = 'kafka',
>[...]
> )
> {code}
> The insert into query looks like:
> {code:sql}
> INSERT INTO kafka VALUES
> ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', x'C0FFEE', 'k2', 
> x'BABE'], TRUE),
> ('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP BYTES>), FALSE),
> ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2', 
> X'20'], TRUE)
> {code}
> Note that in the first row, the byte literal is of length 3, while in the 
> last row the byte literal is of length 1.
> The generated plan of this INSERT INTO is:
> {code}
> == Abstract Syntax Tree ==
> LogicalSink(table=[default_catalog.default_database.kafka], 
> fields=[physical_1, physical_2, physical_3, headers, timestamp])
> +- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], 
> headers=[CAST($3):(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP], 
> timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
>+- LogicalUnion(all=[true])
>   :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], 
> EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
> X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
>   :  +- LogicalValues(tuples=[[{ 0 }]])
>   :- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], 
> EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], 
> EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP], EXPR$4=[false])
>   :  +- LogicalValues(tuples=[[{ 0 }]])
>   +- LogicalProject(EXPR$0=[_UTF-16LE'data 3'], EXPR$1=[3], 
> EXPR$2=[2020-03-10 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
> X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1))], EXPR$4=[true])
>  +- LogicalValues(tuples=[[{ 0 }]])
> == Optimized Physical Plan ==
> Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
> physical_2, physical_3, headers, timestamp])
> +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
> timestamp])
>:- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS 
> physical_3, CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), 
> _UTF-16LE'k2', X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT 
> NULL, BINARY(1) NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE", VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 
> 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>:  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
>:- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS 
> physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 
> 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>:  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
>+- Calc(select=[_UTF-16LE'data 3' AS physical_1, 3 AS physical_2, true AS 
> physical_3, CAST(MAP(_UTF-16LE'k1', X'10':BINARY(1), _UTF-16LE'k2', 
> X'20':BINARY(1)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINAR

[jira] [Assigned] (FLINK-26952) Remove old CSV connector

2022-04-13 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-26952:
---

Assignee: (was: Francesco Guardiani)

> Remove old CSV connector
> 
>
> Key: FLINK-26952
> URL: https://issues.apache.org/jira/browse/FLINK-26952
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Francesco Guardiani
>Priority: Major
>
> This involves removing all references in our tests to the old CSV connector 
> (mostly TableEnvironmentITCase, TableEnvironmentTest and TableITCase) and 
> removing the usage from a couple of e2e tests



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27201) Remove CollectTableSink

2022-04-12 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-27201:
---

 Summary: Remove CollectTableSink
 Key: FLINK-27201
 URL: https://issues.apache.org/jira/browse/FLINK-27201
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27200) Replace CollectionTableSource and CollectionTableSink with new table stack alternatives

2022-04-12 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-27200:
---

 Summary: Replace CollectionTableSource and CollectionTableSink 
with new table stack alternatives
 Key: FLINK-27200
 URL: https://issues.apache.org/jira/browse/FLINK-27200
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani


We need to replace the old CollectionTableSource and CollectionTableSink with 
alternatives from the new stack, for example TestValuesTableFactory or some 
simpler and more straightforward alternative coming from TableFactoryHarness.

This task requires to identify the alternative for the old 
CollectionTableSource and CollectionTableSink, eventually creating a new one, 
and then replace the usage in tests.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27198) Cleanup tests testing legacy TableSink/TableSource

2022-04-12 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-27198:
---

 Summary: Cleanup tests testing legacy TableSink/TableSource
 Key: FLINK-27198
 URL: https://issues.apache.org/jira/browse/FLINK-27198
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani


We have a lot of mock TableSink/TableSource used only for testing features of 
the old TableSink/TableSource stack. We should identify them and remove 
altogether with the tests. A non complete list:


{{Implementations of TableSink in  (25 usages found)}}
{{    Test  (14 usages found)}}
{{        flink-python_2.12  (4 usages found)}}
{{            org.apache.flink.table.legacyutils  (4 usages found)}}
{{                legacyTestingSinks.scala  (3 usages found)}}
{{                    39 private[flink] class TestAppendSink extends 
AppendStreamTableSink[Row] {}}
{{                    69 private[flink] class TestRetractSink extends 
RetractStreamTableSink[Row] {}}
{{                    95 private[flink] class TestUpsertSink(}}
{{                TestCollectionTableFactory.scala  (1 usage found)}}
{{                    168 class CollectionTableSink(val outputType: 
RowTypeInfo)}}
{{        flink-table-common  (1 usage found)}}
{{            org.apache.flink.table.utils  (1 usage found)}}
{{                TypeMappingUtilsTest.java  (1 usage found)}}
{{                    419 private static class TestTableSink implements 
TableSink> {}}
{{        flink-table-planner_2.12  (9 usages found)}}
{{            org.apache.flink.table.planner.factories.utils  (1 usage found)}}
{{                TestCollectionTableFactory.scala  (1 usage found)}}
{{                    152 class CollectionTableSink(val schema: TableSchema)}}
{{            org.apache.flink.table.planner.runtime.batch.sql  (1 usage 
found)}}
{{                PartitionableSinkITCase.scala  (1 usage found)}}
{{                    331 private class TestSink(}}
{{            org.apache.flink.table.planner.runtime.utils  (3 usages found)}}
{{                StreamTestSink.scala  (3 usages found)}}
{{                    265 final class TestingUpsertTableSink(val keys: 
Array[Int], val tz: TimeZone)}}
{{                    344 final class TestingAppendTableSink(tz: TimeZone) 
extends AppendStreamTableSink[Row] {}}
{{                    499 final class TestingRetractTableSink(tz: TimeZone) 
extends RetractStreamTableSink[Row] {}}
{{            org.apache.flink.table.planner.utils  (4 usages found)}}
{{                MemoryTableSourceSinkUtil.scala  (3 usages found)}}
{{                    80 final class UnsafeMemoryAppendTableSink}}
{{                    146 final class DataTypeOutputFormatTableSink(}}
{{                    181 final class DataTypeAppendStreamTableSink(}}
{{                testTableSourceSinks.scala  (1 usage found)}}
{{                    1317 class OptionsTableSink(}}

{{Implementations of TableSource in  (31 usages found)}}
{{    Production  (8 usages found)}}
{{        flink-batch-sql-test  (1 usage found)}}
{{            org.apache.flink.sql.tests  (1 usage found)}}
{{                BatchSQLTestProgram.java  (1 usage found)}}
{{                    79 public static class GeneratorTableSource extends 
InputFormatTableSource {}}
{{        flink-stream-sql-test_2.12  (1 usage found)}}
{{            org.apache.flink.sql.tests  (1 usage found)}}
{{                StreamSQLTestProgram.java  (1 usage found)}}
{{                    192 public static class GeneratorTableSource}}
{{    Test  (23 usages found)}}
{{        flink-python_2.12  (1 usage found)}}
{{            org.apache.flink.table.legacyutils  (1 usage found)}}
{{                TestCollectionTableFactory.scala  (1 usage found)}}
{{                    127 class CollectionTableSource(}}
{{        flink-sql-client_2.12  (1 usage found)}}
{{            org.apache.flink.table.client.gateway.utils  (1 usage found)}}
{{                SimpleCatalogFactory.java  (1 usage found)}}
{{                    95 new StreamTableSource() {}}
{{        flink-table-api-java  (1 usage found)}}
{{            org.apache.flink.table.utils  (1 usage found)}}
{{                TableSourceMock.java  (1 usage found)}}
{{                    27 public class TableSourceMock implements 
TableSource {}}
{{        flink-table-common  (1 usage found)}}
{{            org.apache.flink.table.utils  (1 usage found)}}
{{                TypeMappingUtilsTest.java  (1 usage found)}}
{{                    375 private static class TestTableSource}}
{{        flink-table-planner_2.12  (19 usages found)}}
{{            org.apache.flink.table.planner.factories.utils  (1 usage found)}}
{{                TestCollectionTableFactory.scala  (1 usage found)}}
{{                    115 class CollectionTableSource(}}
{{            org.apache.flink.table.planner.plan.stream.sql.join  (2 usages 
found)}}
{{                LookupJoinTest.scala  (2 usages found)}}
{{                

[jira] [Created] (FLINK-27197) Port ArrowTableSource and PythonInputFormatTableSource to DynamicTableSource

2022-04-12 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-27197:
---

 Summary: Port ArrowTableSource and PythonInputFormatTableSource to 
DynamicTableSource
 Key: FLINK-27197
 URL: https://issues.apache.org/jira/browse/FLINK-27197
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27196) Remove old format interfaces

2022-04-12 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-27196:
---

 Summary: Remove old format interfaces
 Key: FLINK-27196
 URL: https://issues.apache.org/jira/browse/FLINK-27196
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24700) Clarify semantics of filter, projection, partition, and metadata pushdown

2022-04-11 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17520654#comment-17520654
 ] 

Francesco Guardiani commented on FLINK-24700:
-

Has this one already been solved [~twalthr] by one of the previous interfaces 
reworking efforts?

> Clarify semantics of filter, projection, partition, and metadata pushdown
> -
>
> Key: FLINK-24700
> URL: https://issues.apache.org/jira/browse/FLINK-24700
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> FLINK-19903 has revealed a couple of shortcomings that occur when 
> implementing multiple ability interfaces. We should improve the documentation 
> and better define the semantics.
> - Push produced type not only for metadata pushdown but also projection push 
> down.
> - Clarify order of filter + projection
> - Clarify order of projection/filter + partition
> - Simplify handling of partition columns
> ...



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27185) Run the assertj conversion script to convert assertions in connectors and formats

2022-04-11 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-27185:
---

 Summary: Run the assertj conversion script to convert assertions 
in connectors and formats
 Key: FLINK-27185
 URL: https://issues.apache.org/jira/browse/FLINK-27185
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25684) Support enhanced show databases syntax

2022-04-08 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17519492#comment-17519492
 ] 

Francesco Guardiani commented on FLINK-25684:
-

Hi [~jark] [~lzljs3620320] [~ZhangChaoming], I left a comment on the PR about 
whether it makes sense to have this feature or not. I'm gonna repost here for 
broader visibility:
{quote}One thing about this PR which I'm not sure about is that we're adding 
support only for LIKE, which is a very specific feature, which might not cover 
all use cases out there for querying the catalog and look what tables and 
databases you have.

This makes sense, but on the other hand I think it's limited and very specific. 
I wonder whether we should approach to this problem from a different angle, 
which is not continuing to include syntax, but rather go in the direction that 
other traditional databases already do: Allow to query catalog informations 
(databases, tables, functions, etc) as if they are other tables, by reusing 
everything we already have. This then would allow users to query catalog 
informations (tables, databases, functions, etc) as if they are normal flink 
tables by using SELECT and every other functionality provided by projection, 
filter and calc operators. As I said, this is the approach took by most 
traditional databases, e.g. MySQL information_schema.

Implementation wise, it should even be relatively easy, as you just need to 
implement a DynamicTableSource for querying the catalog and then automatically 
register it.
{quote}
Do you have any opinions about this? Wouldn't it be a better and more 
definitive solution for the problem?

> Support enhanced show databases syntax
> --
>
> Key: FLINK-25684
> URL: https://issues.apache.org/jira/browse/FLINK-25684
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Moses
>Assignee: Moses
>Priority: Major
>  Labels: pull-request-available
>
> Enhanced `show databases` statement like ` show databasesfrom like 'db%' ` 
> has been supported broadly in many popular SQL engine like Spark SQL/MySQL.
> We could use such statement to easily show the databases that we wannted.
> h3. SHOW DATABSES [ LIKE regex_pattern ]
> Examples:
> {code:java}
> Flink SQL> create database db1;
> [INFO] Execute statement succeed.
> Flink SQL> create database db1_1;
> [INFO] Execute statement succeed.
> Flink SQL> create database pre_db;
> [INFO] Execute statement succeed.
> Flink SQL> show databases;
> +--+
> |database name |
> +--+
> | default_database |
> |  db1 |
> |db1_1 |
> |   pre_db |
> +--+
> 4 rows in set
> Flink SQL> show databases like 'db1';
> +---+
> | database name |
> +---+
> |   db1 |
> +---+
> 1 row in set
> Flink SQL> show databases like 'db%';
> +---+
> | database name |
> +---+
> |   db1 |
> | db1_1 |
> +---+
> 2 rows in set
> Flink SQL> show databases like '%db%';
> +---+
> | database name |
> +---+
> |   db1 |
> | db1_1 |
> |pre_db |
> +---+
> 3 rows in set
> Flink SQL> show databases like '%db';
> +---+
> | database name |
> +---+
> |pre_db |
> +---+
> 1 row in set
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-27131) CastFunctionITCase.test failed on azure

2022-04-08 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-27131:
---

Assignee: Francesco Guardiani

> CastFunctionITCase.test failed on azure
> ---
>
> Key: FLINK-27131
> URL: https://issues.apache.org/jira/browse/FLINK-27131
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: Yun Gao
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: test-stability
>
> {code:java}
> Apr 07 20:09:45 [ERROR] Tests run: 513, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 13.125 s <<< FAILURE! - in 
> org.apache.flink.table.planner.functions.CastFunctionITCase
> Apr 07 20:09:45 [ERROR] 
> org.apache.flink.table.planner.functions.CastFunctionITCase.test(TestCase)[342]
>   Time elapsed: 0.008 s  <<< ERROR!
> Apr 07 20:09:45 org.apache.flink.api.common.InvalidProgramException: The 
> LocalStreamEnvironment cannot be used when submitting a program through a 
> client, or running in a TestEnvironment context.
> Apr 07 20:09:45   at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.validateAndGetConfiguration(LocalStreamEnvironment.java:56)
> Apr 07 20:09:45   at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.(LocalStreamEnvironment.java:51)
> Apr 07 20:09:45   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(StreamExecutionEnvironment.java:2320)
> Apr 07 20:09:45   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$15(StreamExecutionEnvironment.java:2262)
> Apr 07 20:09:45   at java.util.Optional.orElseGet(Optional.java:267)
> Apr 07 20:09:45   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2262)
> Apr 07 20:09:45   at 
> org.apache.flink.table.planner.delegation.DefaultExecutorFactory.create(DefaultExecutorFactory.java:48)
> Apr 07 20:09:45   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:278)
> Apr 07 20:09:45   at 
> org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:93)
> Apr 07 20:09:45   at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:299)
> Apr 07 20:09:45   at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113)
> Apr 07 20:09:45   at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93)
> Apr 07 20:09:45   at 
> sun.reflect.GeneratedMethodAccessor107.invoke(Unknown Source)
> Apr 07 20:09:45   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Apr 07 20:09:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Apr 07 20:09:45   at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
> Apr 07 20:09:45   at 

[jira] [Commented] (FLINK-27131) CastFunctionITCase.test failed on azure

2022-04-08 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17519376#comment-17519376
 ] 

Francesco Guardiani commented on FLINK-27131:
-

Sounds like a duplicate of https://issues.apache.org/jira/browse/FLINK-26979, 
investigating it

> CastFunctionITCase.test failed on azure
> ---
>
> Key: FLINK-27131
> URL: https://issues.apache.org/jira/browse/FLINK-27131
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: Yun Gao
>Priority: Major
>  Labels: test-stability
>
> {code:java}
> Apr 07 20:09:45 [ERROR] Tests run: 513, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 13.125 s <<< FAILURE! - in 
> org.apache.flink.table.planner.functions.CastFunctionITCase
> Apr 07 20:09:45 [ERROR] 
> org.apache.flink.table.planner.functions.CastFunctionITCase.test(TestCase)[342]
>   Time elapsed: 0.008 s  <<< ERROR!
> Apr 07 20:09:45 org.apache.flink.api.common.InvalidProgramException: The 
> LocalStreamEnvironment cannot be used when submitting a program through a 
> client, or running in a TestEnvironment context.
> Apr 07 20:09:45   at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.validateAndGetConfiguration(LocalStreamEnvironment.java:56)
> Apr 07 20:09:45   at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.(LocalStreamEnvironment.java:51)
> Apr 07 20:09:45   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(StreamExecutionEnvironment.java:2320)
> Apr 07 20:09:45   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$15(StreamExecutionEnvironment.java:2262)
> Apr 07 20:09:45   at java.util.Optional.orElseGet(Optional.java:267)
> Apr 07 20:09:45   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2262)
> Apr 07 20:09:45   at 
> org.apache.flink.table.planner.delegation.DefaultExecutorFactory.create(DefaultExecutorFactory.java:48)
> Apr 07 20:09:45   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:278)
> Apr 07 20:09:45   at 
> org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:93)
> Apr 07 20:09:45   at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:299)
> Apr 07 20:09:45   at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113)
> Apr 07 20:09:45   at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93)
> Apr 07 20:09:45   at 
> sun.reflect.GeneratedMethodAccessor107.invoke(Unknown Source)
> Apr 07 20:09:45   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Apr 07 20:09:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Apr 07 20:09:45   at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
> Apr 07 20:09:45   at 
> org.junit.jupiter.engine.execution.Executable

[jira] [Comment Edited] (FLINK-26760) The new CSV source (file system source + CSV format) does not support reading files whose file encoding is not UTF-8

2022-04-04 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17516747#comment-17516747
 ] 

Francesco Guardiani edited comment on FLINK-26760 at 4/4/22 11:04 AM:
--

Ok just checked the TPC-DS spec:

> Comment: The data generated by dsdgen includes some international characters. 
> Examples of international
characters are Ô and É. The database must preserve these characters during 
loading and processing by using a
character encoding such as ISO/IEC 8859-1 that includes these characters.

from [https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-ds_v3.2.0.pdf]

I guess somehow we need either an automatic detection of the encoding, or some 
way to specify it?


was (Author: slinkydeveloper):
Ok just checked the TPC-DS spec:

> Comment: The data generated by dsdgen includes some international characters. 
> Examples of international
characters are Ô and É. The database must preserve these characters during 
loading and processing by using a
character encoding such as ISO/IEC 8859-1 that includes these characters.

from https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-ds_v3.2.0.pdf

> The new CSV source (file system source + CSV format) does not support reading 
> files whose file encoding is not UTF-8
> 
>
> Key: FLINK-26760
> URL: https://issues.apache.org/jira/browse/FLINK-26760
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, Formats (JSON, Avro, Parquet, 
> ORC, SequenceFile)
>Affects Versions: 1.15.0, 1.13.6, 1.14.4
>Reporter: Lijie Wang
>Assignee: Arvid Heise
>Priority: Major
> Fix For: 1.15.0
>
> Attachments: PerformanceTest.java, example.csv
>
>
> The new CSV source (file system source + CSV format) does not support reading 
> files whose file encoding is not UTF-8, but the legacy {{CsvTableSource}} 
> supports it.
> We provide an {{*example.csv*}} whose file encoding is {{{}ISO-8599-1{}}}.
> When reading it with the legacy {{{}CsvTableSource{}}}, it executes correctly:
> {code:java}
> @Test
> public void testLegacyCsvSource() {
> EnvironmentSettings environmentSettings = 
> EnvironmentSettings.inBatchMode();
> TableEnvironment tEnv = TableEnvironment.create(environmentSettings);
> CsvTableSource.Builder builder = CsvTableSource.builder();
> CsvTableSource source =
> builder.path("example.csv")
> .emptyColumnAsNull()
> .lineDelimiter("\n")
> .fieldDelimiter("|")
> .field("name", DataTypes.STRING())
> .build();
> ConnectorCatalogTable catalogTable = 
> ConnectorCatalogTable.source(source, true);
> tEnv.getCatalog(tEnv.getCurrentCatalog())
> .ifPresent(
> catalog -> {
> try {
> catalog.createTable(
> new 
> ObjectPath(tEnv.getCurrentDatabase(), "example"),
> catalogTable,
> false);
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> });
> tEnv.executeSql("select count(name) from example").print();
> }
> {code}
>  
> When reading it with the new CSV source (file system source + CSV format), it 
> throws the following error:
> {code:java}
> @Test
> public void testNewCsvSource() {
> EnvironmentSettings environmentSettings = 
> EnvironmentSettings.inBatchMode();
> TableEnvironment tEnv = TableEnvironment.create(environmentSettings);
> String ddl =
> "create table example ("
> + "name string"
> + ") with ("
> + "'connector' = 'filesystem',"
> + "'path' = 'example.csv',"
> + "'format' = 'csv',"
> + "'csv.array-element-delimiter' = '\n',"
> + "'csv.field-delimiter' = '|',"
> + "'csv.null-literal' = ''"
> + ")";
> tEnv.executeSql(ddl);
> tEnv.executeSql("select count(name) from example").print();
> }
> {code}
> {code:java}
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
> exception
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.j

[jira] [Commented] (FLINK-26760) The new CSV source (file system source + CSV format) does not support reading files whose file encoding is not UTF-8

2022-04-04 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17516747#comment-17516747
 ] 

Francesco Guardiani commented on FLINK-26760:
-

Ok just checked the TPC-DS spec:

> Comment: The data generated by dsdgen includes some international characters. 
> Examples of international
characters are Ô and É. The database must preserve these characters during 
loading and processing by using a
character encoding such as ISO/IEC 8859-1 that includes these characters.

from https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-ds_v3.2.0.pdf

> The new CSV source (file system source + CSV format) does not support reading 
> files whose file encoding is not UTF-8
> 
>
> Key: FLINK-26760
> URL: https://issues.apache.org/jira/browse/FLINK-26760
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, Formats (JSON, Avro, Parquet, 
> ORC, SequenceFile)
>Affects Versions: 1.15.0, 1.13.6, 1.14.4
>Reporter: Lijie Wang
>Assignee: Arvid Heise
>Priority: Major
> Fix For: 1.15.0
>
> Attachments: PerformanceTest.java, example.csv
>
>
> The new CSV source (file system source + CSV format) does not support reading 
> files whose file encoding is not UTF-8, but the legacy {{CsvTableSource}} 
> supports it.
> We provide an {{*example.csv*}} whose file encoding is {{{}ISO-8599-1{}}}.
> When reading it with the legacy {{{}CsvTableSource{}}}, it executes correctly:
> {code:java}
> @Test
> public void testLegacyCsvSource() {
> EnvironmentSettings environmentSettings = 
> EnvironmentSettings.inBatchMode();
> TableEnvironment tEnv = TableEnvironment.create(environmentSettings);
> CsvTableSource.Builder builder = CsvTableSource.builder();
> CsvTableSource source =
> builder.path("example.csv")
> .emptyColumnAsNull()
> .lineDelimiter("\n")
> .fieldDelimiter("|")
> .field("name", DataTypes.STRING())
> .build();
> ConnectorCatalogTable catalogTable = 
> ConnectorCatalogTable.source(source, true);
> tEnv.getCatalog(tEnv.getCurrentCatalog())
> .ifPresent(
> catalog -> {
> try {
> catalog.createTable(
> new 
> ObjectPath(tEnv.getCurrentDatabase(), "example"),
> catalogTable,
> false);
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> });
> tEnv.executeSql("select count(name) from example").print();
> }
> {code}
>  
> When reading it with the new CSV source (file system source + CSV format), it 
> throws the following error:
> {code:java}
> @Test
> public void testNewCsvSource() {
> EnvironmentSettings environmentSettings = 
> EnvironmentSettings.inBatchMode();
> TableEnvironment tEnv = TableEnvironment.create(environmentSettings);
> String ddl =
> "create table example ("
> + "name string"
> + ") with ("
> + "'connector' = 'filesystem',"
> + "'path' = 'example.csv',"
> + "'format' = 'csv',"
> + "'csv.array-element-delimiter' = '\n',"
> + "'csv.field-delimiter' = '|',"
> + "'csv.null-literal' = ''"
> + ")";
> tEnv.executeSql(ddl);
> tEnv.executeSql("select count(name) from example").print();
> }
> {code}
> {code:java}
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
> exception
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
>   at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>   at 

[jira] [Comment Edited] (FLINK-26760) The new CSV source (file system source + CSV format) does not support reading files whose file encoding is not UTF-8

2022-04-04 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17516744#comment-17516744
 ] 

Francesco Guardiani edited comment on FLINK-26760 at 4/4/22 10:58 AM:
--

Just checked, the bench files effectively have some non utf-8 chars in the 
customer.dat file: 
{code:java}
87620|EEGFBAAA|1451595|3530|15865|2451813|2451783|Sir|Eric|Hartley|N|4|8|1991|R�UNION||eric.hart...@vuzlvx.com|2452643|{code}


was (Author: slinkydeveloper):
Just checked, the bench files effectively have some non utf-8 chars in the 
customer.dat file: 
87620|EEGFBAAA|1451595|3530|15865|2451813|2451783|Sir|Eric|Hartley|N|4|8|1991|R�UNION||eric.hart...@vuzlvx.com|2452643|

> The new CSV source (file system source + CSV format) does not support reading 
> files whose file encoding is not UTF-8
> 
>
> Key: FLINK-26760
> URL: https://issues.apache.org/jira/browse/FLINK-26760
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, Formats (JSON, Avro, Parquet, 
> ORC, SequenceFile)
>Affects Versions: 1.15.0, 1.13.6, 1.14.4
>Reporter: Lijie Wang
>Assignee: Arvid Heise
>Priority: Major
> Fix For: 1.15.0
>
> Attachments: PerformanceTest.java, example.csv
>
>
> The new CSV source (file system source + CSV format) does not support reading 
> files whose file encoding is not UTF-8, but the legacy {{CsvTableSource}} 
> supports it.
> We provide an {{*example.csv*}} whose file encoding is {{{}ISO-8599-1{}}}.
> When reading it with the legacy {{{}CsvTableSource{}}}, it executes correctly:
> {code:java}
> @Test
> public void testLegacyCsvSource() {
> EnvironmentSettings environmentSettings = 
> EnvironmentSettings.inBatchMode();
> TableEnvironment tEnv = TableEnvironment.create(environmentSettings);
> CsvTableSource.Builder builder = CsvTableSource.builder();
> CsvTableSource source =
> builder.path("example.csv")
> .emptyColumnAsNull()
> .lineDelimiter("\n")
> .fieldDelimiter("|")
> .field("name", DataTypes.STRING())
> .build();
> ConnectorCatalogTable catalogTable = 
> ConnectorCatalogTable.source(source, true);
> tEnv.getCatalog(tEnv.getCurrentCatalog())
> .ifPresent(
> catalog -> {
> try {
> catalog.createTable(
> new 
> ObjectPath(tEnv.getCurrentDatabase(), "example"),
> catalogTable,
> false);
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> });
> tEnv.executeSql("select count(name) from example").print();
> }
> {code}
>  
> When reading it with the new CSV source (file system source + CSV format), it 
> throws the following error:
> {code:java}
> @Test
> public void testNewCsvSource() {
> EnvironmentSettings environmentSettings = 
> EnvironmentSettings.inBatchMode();
> TableEnvironment tEnv = TableEnvironment.create(environmentSettings);
> String ddl =
> "create table example ("
> + "name string"
> + ") with ("
> + "'connector' = 'filesystem',"
> + "'path' = 'example.csv',"
> + "'format' = 'csv',"
> + "'csv.array-element-delimiter' = '\n',"
> + "'csv.field-delimiter' = '|',"
> + "'csv.null-literal' = ''"
> + ")";
> tEnv.executeSql(ddl);
> tEnv.executeSql("select count(name) from example").print();
> }
> {code}
> {code:java}
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
> exception
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
>   at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.

[jira] [Commented] (FLINK-26760) The new CSV source (file system source + CSV format) does not support reading files whose file encoding is not UTF-8

2022-04-04 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17516744#comment-17516744
 ] 

Francesco Guardiani commented on FLINK-26760:
-

Just checked, the bench files effectively have some non utf-8 chars in the 
customer.dat file: 
87620|EEGFBAAA|1451595|3530|15865|2451813|2451783|Sir|Eric|Hartley|N|4|8|1991|R�UNION||eric.hart...@vuzlvx.com|2452643|

> The new CSV source (file system source + CSV format) does not support reading 
> files whose file encoding is not UTF-8
> 
>
> Key: FLINK-26760
> URL: https://issues.apache.org/jira/browse/FLINK-26760
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, Formats (JSON, Avro, Parquet, 
> ORC, SequenceFile)
>Affects Versions: 1.15.0, 1.13.6, 1.14.4
>Reporter: Lijie Wang
>Assignee: Arvid Heise
>Priority: Major
> Fix For: 1.15.0
>
> Attachments: PerformanceTest.java, example.csv
>
>
> The new CSV source (file system source + CSV format) does not support reading 
> files whose file encoding is not UTF-8, but the legacy {{CsvTableSource}} 
> supports it.
> We provide an {{*example.csv*}} whose file encoding is {{{}ISO-8599-1{}}}.
> When reading it with the legacy {{{}CsvTableSource{}}}, it executes correctly:
> {code:java}
> @Test
> public void testLegacyCsvSource() {
> EnvironmentSettings environmentSettings = 
> EnvironmentSettings.inBatchMode();
> TableEnvironment tEnv = TableEnvironment.create(environmentSettings);
> CsvTableSource.Builder builder = CsvTableSource.builder();
> CsvTableSource source =
> builder.path("example.csv")
> .emptyColumnAsNull()
> .lineDelimiter("\n")
> .fieldDelimiter("|")
> .field("name", DataTypes.STRING())
> .build();
> ConnectorCatalogTable catalogTable = 
> ConnectorCatalogTable.source(source, true);
> tEnv.getCatalog(tEnv.getCurrentCatalog())
> .ifPresent(
> catalog -> {
> try {
> catalog.createTable(
> new 
> ObjectPath(tEnv.getCurrentDatabase(), "example"),
> catalogTable,
> false);
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> });
> tEnv.executeSql("select count(name) from example").print();
> }
> {code}
>  
> When reading it with the new CSV source (file system source + CSV format), it 
> throws the following error:
> {code:java}
> @Test
> public void testNewCsvSource() {
> EnvironmentSettings environmentSettings = 
> EnvironmentSettings.inBatchMode();
> TableEnvironment tEnv = TableEnvironment.create(environmentSettings);
> String ddl =
> "create table example ("
> + "name string"
> + ") with ("
> + "'connector' = 'filesystem',"
> + "'path' = 'example.csv',"
> + "'format' = 'csv',"
> + "'csv.array-element-delimiter' = '\n',"
> + "'csv.field-delimiter' = '|',"
> + "'csv.null-literal' = ''"
> + ")";
> tEnv.executeSql(ddl);
> tEnv.executeSql("select count(name) from example").print();
> }
> {code}
> {code:java}
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
> exception
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
>   at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoo

[jira] [Commented] (FLINK-26760) The new CSV source (file system source + CSV format) does not support reading files whose file encoding is not UTF-8

2022-04-04 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17516708#comment-17516708
 ] 

Francesco Guardiani commented on FLINK-26760:
-

Another incident: I'm trying to convert our tdc-ds benchs to use the new csv 
format, and they fail with the same cause: 
https://dev.azure.com/francescoguard/Flink/_build/results?buildId=871&view=logs&j=0e31ee24-31a6-528c-a4bf-45cde9b2a14e&t=ff03a8fa-e84e-5199-efb2-5433077ce8e2&l=3493

> The new CSV source (file system source + CSV format) does not support reading 
> files whose file encoding is not UTF-8
> 
>
> Key: FLINK-26760
> URL: https://issues.apache.org/jira/browse/FLINK-26760
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, Formats (JSON, Avro, Parquet, 
> ORC, SequenceFile)
>Affects Versions: 1.15.0, 1.13.6, 1.14.4
>Reporter: Lijie Wang
>Assignee: Arvid Heise
>Priority: Major
> Fix For: 1.15.0
>
> Attachments: PerformanceTest.java, example.csv
>
>
> The new CSV source (file system source + CSV format) does not support reading 
> files whose file encoding is not UTF-8, but the legacy {{CsvTableSource}} 
> supports it.
> We provide an {{*example.csv*}} whose file encoding is {{{}ISO-8599-1{}}}.
> When reading it with the legacy {{{}CsvTableSource{}}}, it executes correctly:
> {code:java}
> @Test
> public void testLegacyCsvSource() {
> EnvironmentSettings environmentSettings = 
> EnvironmentSettings.inBatchMode();
> TableEnvironment tEnv = TableEnvironment.create(environmentSettings);
> CsvTableSource.Builder builder = CsvTableSource.builder();
> CsvTableSource source =
> builder.path("example.csv")
> .emptyColumnAsNull()
> .lineDelimiter("\n")
> .fieldDelimiter("|")
> .field("name", DataTypes.STRING())
> .build();
> ConnectorCatalogTable catalogTable = 
> ConnectorCatalogTable.source(source, true);
> tEnv.getCatalog(tEnv.getCurrentCatalog())
> .ifPresent(
> catalog -> {
> try {
> catalog.createTable(
> new 
> ObjectPath(tEnv.getCurrentDatabase(), "example"),
> catalogTable,
> false);
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> });
> tEnv.executeSql("select count(name) from example").print();
> }
> {code}
>  
> When reading it with the new CSV source (file system source + CSV format), it 
> throws the following error:
> {code:java}
> @Test
> public void testNewCsvSource() {
> EnvironmentSettings environmentSettings = 
> EnvironmentSettings.inBatchMode();
> TableEnvironment tEnv = TableEnvironment.create(environmentSettings);
> String ddl =
> "create table example ("
> + "name string"
> + ") with ("
> + "'connector' = 'filesystem',"
> + "'path' = 'example.csv',"
> + "'format' = 'csv',"
> + "'csv.array-element-delimiter' = '\n',"
> + "'csv.field-delimiter' = '|',"
> + "'csv.null-literal' = ''"
> + ")";
> tEnv.executeSql(ddl);
> tEnv.executeSql("select count(name) from example").print();
> }
> {code}
> {code:java}
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
> exception
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
>   at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>   at 
> org.apache.flink

[jira] [Created] (FLINK-27043) Remove CSV connector test usages

2022-04-04 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-27043:
---

 Summary: Remove CSV connector test usages
 Key: FLINK-27043
 URL: https://issues.apache.org/jira/browse/FLINK-27043
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-26979) JsonFunctionsITCase.test failed on azure

2022-04-01 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-26979:
---

Assignee: Francesco Guardiani

> JsonFunctionsITCase.test failed on azure
> 
>
> Key: FLINK-26979
> URL: https://issues.apache.org/jira/browse/FLINK-26979
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: Yun Gao
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: test-stability
>
> {code:java}
> Mar 31 04:38:37 [ERROR] Tests run: 140, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 11.301 s <<< FAILURE! - in 
> org.apache.flink.table.planner.functions.JsonFunctionsITCase
> Mar 31 04:38:37 [ERROR] 
> org.apache.flink.table.planner.functions.JsonFunctionsITCase.test(TestCase)[64]
>   Time elapsed: 0.738 s  <<< ERROR!
> Mar 31 04:38:37 java.lang.RuntimeException: Duplicate vertexID 452
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.graph.StreamGraph.addNode(StreamGraph.java:504)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:403)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:382)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.runtime.translators.AbstractOneInputTransformationTranslator.translateInternal(AbstractOneInputTransformationTranslator.java:63)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator.translateForStreamingInternal(OneInputTransformationTranslator.java:65)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator.translateForStreamingInternal(OneInputTransformationTranslator.java:37)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:825)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:555)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:846)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:804)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:555)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:316)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.generateStreamGraph(StreamExecutionEnvironment.java:2153)
> Mar 31 04:38:37   at 
> org.apache.flink.table.planner.delegation.DefaultExecutor.createPipeline(DefaultExecutor.java:83)
> Mar 31 04:38:37   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:832)
> Mar 31 04:38:37   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317)
> Mar 31 04:38:37   at 
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:475)
> Mar 31 04:38:37   at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354)
> Mar 31 04:38:37   at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320)
> Mar 31 04:38:37   at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113)
> Mar 31 04:38:37   at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93)
> Mar 31 04:38:37   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Mar 31 04:38:37   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Mar 31 04:38:37   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Mar 31 04:38:37   at java.lang.reflect.Method.invoke(Method.java:498)
> Mar 31 04:38:37   at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
> Mar 31 04:38:37   at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
> Mar 31 04:38:37   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)

[jira] [Commented] (FLINK-26979) JsonFunctionsITCase.test failed on azure

2022-04-01 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17515876#comment-17515876
 ] 

Francesco Guardiani commented on FLINK-26979:
-

I assigned it to me to investigate if this has something to do with the new 
parallel test execution

> JsonFunctionsITCase.test failed on azure
> 
>
> Key: FLINK-26979
> URL: https://issues.apache.org/jira/browse/FLINK-26979
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: Yun Gao
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: test-stability
>
> {code:java}
> Mar 31 04:38:37 [ERROR] Tests run: 140, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 11.301 s <<< FAILURE! - in 
> org.apache.flink.table.planner.functions.JsonFunctionsITCase
> Mar 31 04:38:37 [ERROR] 
> org.apache.flink.table.planner.functions.JsonFunctionsITCase.test(TestCase)[64]
>   Time elapsed: 0.738 s  <<< ERROR!
> Mar 31 04:38:37 java.lang.RuntimeException: Duplicate vertexID 452
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.graph.StreamGraph.addNode(StreamGraph.java:504)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:403)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:382)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.runtime.translators.AbstractOneInputTransformationTranslator.translateInternal(AbstractOneInputTransformationTranslator.java:63)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator.translateForStreamingInternal(OneInputTransformationTranslator.java:65)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator.translateForStreamingInternal(OneInputTransformationTranslator.java:37)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:825)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:555)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:846)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:804)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:555)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:316)
> Mar 31 04:38:37   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.generateStreamGraph(StreamExecutionEnvironment.java:2153)
> Mar 31 04:38:37   at 
> org.apache.flink.table.planner.delegation.DefaultExecutor.createPipeline(DefaultExecutor.java:83)
> Mar 31 04:38:37   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:832)
> Mar 31 04:38:37   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317)
> Mar 31 04:38:37   at 
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:475)
> Mar 31 04:38:37   at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354)
> Mar 31 04:38:37   at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320)
> Mar 31 04:38:37   at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113)
> Mar 31 04:38:37   at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93)
> Mar 31 04:38:37   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Mar 31 04:38:37   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Mar 31 04:38:37   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Mar 31 04:38:37   at java.lang.reflect.Method.invoke(Method.java:498)
> Mar 31 04:38:37   at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
> Mar 31 04:38:37   at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
> Mar 31 04:38:37   at 
> org.junit

[jira] [Created] (FLINK-26988) Better error reporting when format fails

2022-04-01 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26988:
---

 Summary: Better error reporting when format fails
 Key: FLINK-26988
 URL: https://issues.apache.org/jira/browse/FLINK-26988
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Reporter: Francesco Guardiani


Today when a format fails, depending on the format implementation, there might 
be no information about the split (file name, offset, etc), making hard to 
debug format failures. For example, here is what I've seen with csv format:


{code:java}
Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
exception
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
 ~[flink-connector-files-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
 ~[flink-connector-files-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
 ~[flink-connector-files-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:403)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:531)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:227)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:841)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:767) 
~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 
~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at java.lang.Thread.run(Thread.java:829) ~[?:?]
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
 ~[flink-connector-files-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
 ~[flink-connector-files-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 
~[?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
~[?:?]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
~[?:?]
    at java.lang.Thread.run(Thread.java:829) ~[?:?]
Caused by: java.lang.RuntimeException: Invalid UTF-8 middle byte 0x54 (at char 
#3529, byte #3528): check content encoding, does not look like UTF-8
    at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator._handleIOException(MappingIterator.java:417)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator.hasNext(MappingIterator.java:190)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.formats.csv.CsvReaderFormat$Reader.read(CsvReaderFormat.java:194)
 ~[flink-csv-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.connector.file.src.impl.StreamFormatAdapter$Reader.readBatch(StreamFormatAdapter.java:214)
 ~[flink-connector-files-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
 ~[flink-connector-files-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    a

[jira] [Commented] (FLINK-25234) Flink should parse ISO timestamp in UTC format

2022-03-31 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17515360#comment-17515360
 ] 

Francesco Guardiani commented on FLINK-25234:
-

I think overall it makes sense to have a parse function for 
timestamps/intervals which can accept ISO8601 timestamps and durations

> Flink should parse ISO timestamp in UTC format
> --
>
> Key: FLINK-25234
> URL: https://issues.apache.org/jira/browse/FLINK-25234
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / API
>Affects Versions: 1.14.0
>Reporter: Egor Ryashin
>Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Error parsing timestamp with ISO-8601 format:
> {code:java}
> [ERROR] Could not execute SQL statement. Reason:
> java.time.format.DateTimeParseException: Text '2021-12-08T12:59:57.028Z' 
> could not be parsed, unparsed text found at index 23 {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-25284) Support nulls in DataGen

2022-03-31 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-25284:
---

Assignee: Sergey Nuyanzin

> Support nulls in DataGen
> 
>
> Key: FLINK-25284
> URL: https://issues.apache.org/jira/browse/FLINK-25284
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Minor
>  Labels: pull-request-available
>
> Currently it is impossible to specify that some values should be null 
> sometimes.
> It would be nice to have some property something like {{null-rate}} telling 
> how often there should be {{null}} value generated
> something like that
> {code:sql}
> CREATE TABLE Orders (
> order_number STRING,
> priceDECIMAL(32,2),
> buyerROW,
> order_time   TIMESTAMP(3),
> my_map   MAP,
> my_arrrayARRAY
> ) WITH (
>'connector' = 'datagen',
>'fields.order_number.null-rate' = '0.7',
>'fields.price.null-rate' = '1.0',
>'fields.order_time.null-rate' = '0.5',
>'fields.buyer.id.null-rate' = '0.5',
>'fields.buyer.null-rate' = '0.5',
>'fields.my_map.key.null-rate' = '0.5',
>'fields.my_map.null-rate' = '0.5',
>'fields.my_array.element.null-rate' = '0.1',
>'fields.my_array.null-rate' = '0.5'
> );
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26327) throw not a literal exception in callContext.getArgumentValue when getTypeInference

2022-03-31 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17515331#comment-17515331
 ] 

Francesco Guardiani commented on FLINK-26327:
-

Can you share the code of the Nvl class, or even better a small self-contained 
reproducer?

> throw not a literal exception in callContext.getArgumentValue when 
> getTypeInference
> ---
>
> Key: FLINK-26327
> URL: https://issues.apache.org/jira/browse/FLINK-26327
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.2
>Reporter: Spongebob
>Priority: Major
>
>  
> {code:java}
> //代码占位符
> tableEnvironment.createTemporaryFunction("ROUNDX", RoundX.class);
> tableEnvironment.createTemporaryFunction("NVL", Nvl.class);
> tableEnvironment.executeSql("select ROUNDX( CAST(1.12345 as 
> decimal(10,3)),NVL(MAX(f0),2) ) from t1").print();
> // exception
> Exception in thread "main" java.lang.AssertionError: not a literal: NVL($0, 2)
> // trace
> // `NVL` is a scalarFunction that likes oracle nvl function. And this 
> exception might be thrown from this code in my  `getTypeInference` function 
> of ROUNDX scalarFunction.
> Optional secondValue = callContext.getArgumentValue(1, 
> Integer.class);{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-26672) In Table/SQL API, Table configuration ' table.local-time-zone' does not work

2022-03-31 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani resolved FLINK-26672.
-
Release Note: I closed this issue as we cannot reproduce and might have 
been fixed on master already, please reopen if you have new findings about it
  Resolution: Cannot Reproduce

> In Table/SQL API, Table configuration ' table.local-time-zone' does not work
> 
>
> Key: FLINK-26672
> URL: https://issues.apache.org/jira/browse/FLINK-26672
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.13.1
>Reporter: RuiLin-Zhang
>Priority: Major
> Fix For: 1.13.1
>
> Attachments: image-2022-03-17-14-08-16-351.png
>
>
> I created a table structure
> {color:#ff}tabEnv.getConfig.getConfiguration.setString("table.local-time-zone",
>  "Asia/Shanghai"){color}
> create table dwd_event_kafka (
> `platform` bigint,
> `ts` bigint,
> `event_ts` as to_timestamp_ltz(ts,3),
> `log_time` bigint,
> `event` integer,
> `page` integer,
> `guid` string,
> `goods_id` bigint,
> `third_class_id` bigint,
> `main_goods_id` bigint,
> `event_time` as to_timestamp_ltz(log_time,3),
> watermark for event_time as event_time - interval '5' second
> ) with (
> 'connector' = 'kafka',
> ...
> )
> > +I[101, 1647402695000, 2022-03-16T03:51:35Z, 1647402698149, 101, 131001, 
> > 22547286, 8005242, 471, 8005242, 2022-03-16T03:51:38.149Z]
> I found is wrong, the output data converted by  to_timestamp_ltz function . I 
> tried to configure {color:#ff}"table.local-time-zone"{color} and found it 
> didn't work.
>  
> In Shanghai time zone,the timestamp(1647402695000 ) should be  
> "2022-03-16T11:51:35Z" instead of  "2022-03-16T03:51:35Z"  .
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26949) Add NVL supported in SQL & Table API

2022-03-31 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17515325#comment-17515325
 ] 

Francesco Guardiani commented on FLINK-26949:
-

Any particular reason for having this synonim, given we already have COALESCE 
and IFNULL?

> Add NVL supported in SQL & Table API
> 
>
> Key: FLINK-26949
> URL: https://issues.apache.org/jira/browse/FLINK-26949
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: dalongliu
>Priority: Major
> Fix For: 1.16.0
>
>
> Returns {{expr2}} if {{expr1}} is {{{}NULL{}}}, or {{expr1}} otherwise.
> Syntax:
> {code:java}
> nvl(expr1, expr2) {code}
> Arguments:
>  * {{{}expr1{}}}: An expression of any type.
>  * {{{}expr2{}}}: An expression that shares a least common type with 
> {{{}expr1{}}}.
> Reutrns:
> The result type is the least common type of the argument types.
> This function is a synonym for {{{}coalesce(expr1, expr2){}}}.
> Examples:
> {code:java}
> > SELECT nvl(NULL, 2);
>  2
> > SELECT nvl(3, 2);
>  3
>  {code}
> See more:
>  * [Spark|https://spark.apache.org/docs/latest/api/sql/index.html#nvl]
>  * [Hive|https://cwiki.apache.org/confluence/display/hive/languagemanual+udf]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26946) Add SHIFTRIGHT supported in SQL & Table API

2022-03-31 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17515319#comment-17515319
 ] 

Francesco Guardiani commented on FLINK-26946:
-

Isn't this a duplicate of https://issues.apache.org/jira/browse/FLINK-12450 ?

> Add SHIFTRIGHT supported in SQL & Table API
> ---
>
> Key: FLINK-26946
> URL: https://issues.apache.org/jira/browse/FLINK-26946
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: dalongliu
>Priority: Critical
> Fix For: 1.16.0
>
>
> Returns a bitwise signed integral number right shifted by {{n}} bits.
> Syntax:
> {code:java}
> shiftright(expr, n){code}
> Arguments:
>  * {{{}expr{}}}: An INTEGER or BIGINT expression.
>  * {{{}n{}}}: An INTEGER expression specifying the number of bits to shift.
> Examples:
> The result type matches {{{}expr{}}}.
> When {{expr}} is negative (that is, the highest order bit is set) the result 
> remains negative because the highest order bit is sticky. When {{n}} is 
> negative the result is 0.
> See more:
>  * [Spark|https://spark.apache.org/docs/latest/api/sql/index.html#shiftright]
>  * [Hive|https://cwiki.apache.org/confluence/display/hive/languagemanual+udf]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26952) Remove old CSV connector

2022-03-31 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26952:
---

 Summary: Remove old CSV connector
 Key: FLINK-26952
 URL: https://issues.apache.org/jira/browse/FLINK-26952
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


This involves removing all references in our tests to the old CSV connector 
(mostly TableEnvironmentITCase, TableEnvironmentTest and TableITCase) and 
removing the usage from a couple of e2e tests



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26950) Remove TableSink and TableSource interfaces

2022-03-31 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26950:
---

 Summary: Remove TableSink and TableSource interfaces
 Key: FLINK-26950
 URL: https://issues.apache.org/jira/browse/FLINK-26950
 Project: Flink
  Issue Type: Technical Debt
Reporter: Francesco Guardiani


This issue involves removing the old TableSink and TableSource stack, replaced 
by the new DynamicTableSink and DynamicTableSource.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26704) Remove string expression DSL

2022-03-30 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani updated FLINK-26704:

Release Note: The deprecated String expression DSL has been removed from 
Java/Scala/Python Table API

> Remove string expression DSL
> 
>
> Key: FLINK-26704
> URL: https://issues.apache.org/jira/browse/FLINK-26704
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Francesco Guardiani
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-21582) COALESCE not works when cast a variable return null

2022-03-22 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani resolved FLINK-21582.
-
Resolution: Fixed

> COALESCE not works when cast a variable return null
> ---
>
> Key: FLINK-21582
> URL: https://issues.apache.org/jira/browse/FLINK-21582
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.1
> Environment: Flink1.11.1
>  
>Reporter: Jerry
>Priority: Minor
>  Labels: auto-unassigned
> Fix For: 1.15.0
>
> Attachments: image-2021-03-03-19-48-21-306.png
>
>
> select COALESCE(cast('aa' as int), 0);return NULL 
> select COALESCE(NULL, 0); return 0 
> The first case works failed, but the second case works successful
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-21582) COALESCE not works when cast a variable return null

2022-03-22 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17510721#comment-17510721
 ] 

Francesco Guardiani commented on FLINK-21582:
-

I think this can be closed, as it has been resolved with 
https://issues.apache.org/jira/browse/FLINK-24385

> COALESCE not works when cast a variable return null
> ---
>
> Key: FLINK-21582
> URL: https://issues.apache.org/jira/browse/FLINK-21582
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.1
> Environment: Flink1.11.1
>  
>Reporter: Jerry
>Priority: Minor
>  Labels: auto-unassigned
> Fix For: 1.15.0
>
> Attachments: image-2021-03-03-19-48-21-306.png
>
>
> select COALESCE(cast('aa' as int), 0);return NULL 
> select COALESCE(NULL, 0); return 0 
> The first case works failed, but the second case works successful
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-26764) Support RESPECT NULLS for FIRST_VALUE/LAST_VALUE

2022-03-21 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-26764:
---

Assignee: luoyuxia

> Support RESPECT  NULLS for FIRST_VALUE/LAST_VALUE
> -
>
> Key: FLINK-26764
> URL: https://issues.apache.org/jira/browse/FLINK-26764
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API, Table SQL / Planner
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
> Fix For: 1.16.0
>
>
> Flink supports fucntion FIRST_VALUE/LAST_VALUE, but the behavior is always 
> ignore null value.
> But the 
> [Spark|https://spark.apache.org/docs/2.4.2/api/sql/index.html#first_value], 
> [Hive|https://cwiki.apache.org/confluence/display/hive/languagemanual+windowingandanalytics],
>  
> [Oracle|https://docs.oracle.com/cd/B19306_01/server.102/b14200/functions057.htm],
>  
> [Snowflake|https://docs.snowflake.com/en/sql-reference/functions/first_value.html],
>  etc, also support to respect null for FIRST_VALUE/LAST_VALUE.
> Should we also support to allow users to specifc whether to ignore null?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-18286) Implement type inference for functions on composite types

2022-03-21 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-18286:
---

Assignee: Francesco Guardiani

> Implement type inference for functions on composite types
> -
>
> Key: FLINK-18286
> URL: https://issues.apache.org/jira/browse/FLINK-18286
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dawid Wysakowicz
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
>
> We should implement type inference for functions such as AT/ITEM/ELEMENT/GET.
> Additionally we should make sure they are consistent in Table API & SQL.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-13786) Implement type inference for other functions

2022-03-21 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-13786:
---

Assignee: Francesco Guardiani

> Implement type inference for other functions
> 
>
> Key: FLINK-13786
> URL: https://issues.apache.org/jira/browse/FLINK-13786
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Jingsong Lee
>Assignee: Francesco Guardiani
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-13785) Implement type inference for time functions

2022-03-21 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-13785:
---

Assignee: Francesco Guardiani

> Implement type inference for time functions
> ---
>
> Key: FLINK-13785
> URL: https://issues.apache.org/jira/browse/FLINK-13785
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Jingsong Lee
>Assignee: Francesco Guardiani
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26782) Remove PlannerExpression and related

2022-03-21 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26782:
---

 Summary: Remove PlannerExpression and related
 Key: FLINK-26782
 URL: https://issues.apache.org/jira/browse/FLINK-26782
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-26770) Nullable ArrayData should not be Object[]

2022-03-21 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-26770:
---

Assignee: Francesco Guardiani

> Nullable ArrayData should not be Object[]
> -
>
> Key: FLINK-26770
> URL: https://issues.apache.org/jira/browse/FLINK-26770
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Table SQL / Runtime
>Affects Versions: 1.15.0
>Reporter: Wenlong Lyu
>Assignee: Francesco Guardiani
>Priority: Major
>
> sql:
>  
> "INSERT INTO %s "
> + " (a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r) values ("
> + "1,'dim',cast(20.2007 as 
> double),false,652482,cast('2020-07-08' as 
> date),'source_test',cast('2020-07-10 16:28:07.737' as timestamp),"
> + "cast(8.58965 as float),cast(ARRAY [464,98661,32489] as 
> array),cast(ARRAY [8589934592,8589934593,8589934594] as array),"
> + "ARRAY[cast(8.58967 as float),cast(96.4667 as 
> float),cast(9345.16 as float)], ARRAY [cast(587897.4646746 as 
> double),cast(792343.646446 as double),cast(76.46464 as double)],"
> + "cast(ARRAY [true,true,false,true] as 
> array),cast(ARRAY ['monday','saturday','sunday'] as 
> array),true,cast(8119.21 as numeric(6,2)), cast('2020-07-10 
> 16:28:07.737' as timestamp)"
> + ")";
> error:
> Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast 
> to [Ljava.lang.Integer;
>   at 
> org.apache.flink.table.data.GenericArrayData.toIntArray(GenericArrayData.java:297)
>  ~[flink-table-common]
>   
> related codegen result:
>   objArray$81 = new Object[result$76.size()];
>   for ( i$82 = 0; i$82 < result$76.size(); i$82++) {
>   if (!result$76.isNullAt(i$82)) {
>   objArray$81[i$82] = result$76.getBoolean(i$82);
> cause:
>   ArrayToArrayCastRule#arrayElementType use Object when a column is 
> nullable, but GenericArrayData only accepts array with specific
>  types, like Integer[], I think we should follow 
> CodeGenUtils#boxedTypeTermForType 
> [~slinkydeveloper]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26764) Support RESPECT NULLS for FIRST_VALUE/LAST_VALUE

2022-03-21 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17509683#comment-17509683
 ] 

Francesco Guardiani commented on FLINK-26764:
-

I think this is reasonable to include, we have a similar issue with OVER as 
well https://issues.apache.org/jira/browse/FLINK-24499

> Support RESPECT  NULLS for FIRST_VALUE/LAST_VALUE
> -
>
> Key: FLINK-26764
> URL: https://issues.apache.org/jira/browse/FLINK-26764
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API, Table SQL / Planner
>Reporter: luoyuxia
>Priority: Major
> Fix For: 1.16.0
>
>
> Flink supports fucntion FIRST_VALUE/LAST_VALUE, but the behavior is always 
> ignore null value.
> But the 
> [Spark|https://spark.apache.org/docs/2.4.2/api/sql/index.html#first_value], 
> [Hive|https://cwiki.apache.org/confluence/display/hive/languagemanual+windowingandanalytics],
>  
> [Oracle|https://docs.oracle.com/cd/B19306_01/server.102/b14200/functions057.htm],
>  
> [Snowflake|https://docs.snowflake.com/en/sql-reference/functions/first_value.html],
>  etc, also support to respect null for FIRST_VALUE/LAST_VALUE.
> Should we also support to allow users to specifc whether to ignore null?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26704) Remove string expression DSL

2022-03-17 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26704:
---

 Summary: Remove string expression DSL
 Key: FLINK-26704
 URL: https://issues.apache.org/jira/browse/FLINK-26704
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26672) In Table/SQL API, Table configuration ' table.local-time-zone' does not work

2022-03-16 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507409#comment-17507409
 ] 

Francesco Guardiani commented on FLINK-26672:
-

This might be an issue of the to string converter, which we resolved on master. 
Can you try this test with a nightly release? Or if you wait a couple of days, 
a 1.15 RC should be released, you can try with that as well.

> In Table/SQL API, Table configuration ' table.local-time-zone' does not work
> 
>
> Key: FLINK-26672
> URL: https://issues.apache.org/jira/browse/FLINK-26672
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.13.1
>Reporter: RuiLin-Zhang
>Priority: Major
> Fix For: 1.13.1
>
>
> I created a table structure
> {color:#ff}tabEnv.getConfig.getConfiguration.setString("table.local-time-zone",
>  "Asia/Shanghai"){color}
> create table dwd_event_kafka (
> `platform` bigint,
> `ts` bigint,
> `event_ts` as to_timestamp_ltz(ts,3),
> `log_time` bigint,
> `event` integer,
> `page` integer,
> `guid` string,
> `goods_id` bigint,
> `third_class_id` bigint,
> `main_goods_id` bigint,
> `event_time` as to_timestamp_ltz(log_time,3),
> watermark for event_time as event_time - interval '5' second
> ) with (
> 'connector' = 'kafka',
> ...
> )
> > +I[101, 1647402695000, 2022-03-16T03:51:35Z, 1647402698149, 101, 131001, 
> > 22547286, 8005242, 471, 8005242, 2022-03-16T03:51:38.149Z]
> I found is wrong, the output data converted by  to_timestamp_ltz function . I 
> tried to configure {color:#ff}"table.local-time-zone"{color} and found it 
> didn't work.
>  
> In Shanghai time zone,the timestamp(1647402695000 ) should be  
> "2022-03-16T11:51:35Z" instead of  "2022-03-16T03:51:35Z"  .
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-25090) Run the assertj conversion script to convert assertions

2022-03-15 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-25090:
---

Assignee: (was: Francesco Guardiani)

> Run the assertj conversion script to convert assertions
> ---
>
> Key: FLINK-25090
> URL: https://issues.apache.org/jira/browse/FLINK-25090
> Project: Flink
>  Issue Type: Bug
>Reporter: Francesco Guardiani
>Priority: Major
>
> See https://assertj.github.io/doc/#assertj-migration-using-regexes for more 
> details



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-26549) INSERT INTO with VALUES leads to wrong type inference with nested types

2022-03-15 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507003#comment-17507003
 ] 

Francesco Guardiani edited comment on FLINK-26549 at 3/15/22, 3:54 PM:
---

Per https://issues.apache.org/jira/browse/CALCITE-5046, our issue seems caused 
by a old Calcite bug which has been fixed in the recent releases 
(https://issues.apache.org/jira/browse/CALCITE-4603 in 1.27+). The next Calcite 
bump should fix it. To verify, I've attached two patches, one testing the plan 
of an INSERT INTO VALUES query, and the other testing the specific type factory 
method (and also converting the test to java)
 * [^InsertIntoValuesTest.patch]
 * [^FlinkTypeFactoryTest.patch]


was (Author: slinkydeveloper):
Per https://issues.apache.org/jira/browse/CALCITE-5046, our issue seems caused 
by a old Calcite bug which has been fixed in the recent releases (1.27+). The 
next Calcite bump should fix it. To verify, I've attached two patches, one 
testing the plan of an INSERT INTO VALUES query, and the other testing the 
specific type factory method (and also converting the test to java)
 * [^InsertIntoValuesTest.patch]
 * [^FlinkTypeFactoryTest.patch]

> INSERT INTO with VALUES leads to wrong type inference with nested types
> ---
>
> Key: FLINK-26549
> URL: https://issues.apache.org/jira/browse/FLINK-26549
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Francesco Guardiani
>Assignee: Francesco Guardiani
>Priority: Major
> Attachments: FlinkTypeFactoryTest.patch, InsertIntoValuesTest.patch, 
> Least_restrictive_issue.patch
>
>
> While working on casting, I've found out we have an interesting bug in the 
> insert values type inference. This comes from the 
> {{KafkaTableITCase#testKafkaSourceSinkWithMetadata}} (look at this version in 
> particular 
> https://github.com/apache/flink/blob/567440115bcacb5aceaf3304e486281c7da8c14f/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java).
> The test scenario is an INSERT INTO VALUES query which is also pushing some 
> metadata to a Kafka table, in particular is writing the headers metadata.
> The table is declared like that:
> {code:sql}
>  CREATE TABLE kafka (
>   `physical_1` STRING,
>   `physical_2` INT,
>   `timestamp-type` STRING METADATA VIRTUAL,
>   `timestamp` TIMESTAMP(3) METADATA,
>   `leader-epoch` INT METADATA VIRTUAL,
>   `headers` MAP METADATA,
>   `partition` INT METADATA VIRTUAL,
>   `topic` STRING METADATA VIRTUAL,
>   `physical_3` BOOLEAN
> ) WITH (
>'connector' = 'kafka',
>[...]
> )
> {code}
> The insert into query looks like:
> {code:sql}
> INSERT INTO kafka VALUES
> ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', x'C0FFEE', 'k2', 
> x'BABE'], TRUE),
> ('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP BYTES>), FALSE),
> ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2', 
> X'20'], TRUE)
> {code}
> Note that in the first row, the byte literal is of length 3, while in the 
> last row the byte literal is of length 1.
> The generated plan of this INSERT INTO is:
> {code}
> == Abstract Syntax Tree ==
> LogicalSink(table=[default_catalog.default_database.kafka], 
> fields=[physical_1, physical_2, physical_3, headers, timestamp])
> +- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], 
> headers=[CAST($3):(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP], 
> timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
>+- LogicalUnion(all=[true])
>   :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], 
> EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
> X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
>   :  +- LogicalValues(tuples=[[{ 0 }]])
>   :- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], 
> EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], 
> EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP], EXPR$4=[false])
>   :  +- LogicalValues(tuples=[[{ 0 }]])
>   +- LogicalProject(EXPR$0=[_UTF-16LE'data 3'], EXPR$1=[3], 
> EXPR$2=[2020-03-10 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
> X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1))], EXPR$4=[true])
>  +- LogicalValues(tuples=[[{ 0 }]])
> == Optimized Physical Plan ==
> Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
> physical_2, physical_3, headers, timestamp])
> +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
> timestamp])
>:- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS 

[jira] [Comment Edited] (FLINK-26549) INSERT INTO with VALUES leads to wrong type inference with nested types

2022-03-15 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507003#comment-17507003
 ] 

Francesco Guardiani edited comment on FLINK-26549 at 3/15/22, 3:53 PM:
---

Per https://issues.apache.org/jira/browse/CALCITE-5046, our issue seems caused 
by a old Calcite bug which has been fixed in the recent releases (1.27+). The 
next Calcite bump should fix it. To verify, I've attached two patches, one 
testing the plan of an INSERT INTO VALUES query, and the other testing the 
specific type factory method (and also converting the test to java)
 * [^InsertIntoValuesTest.patch]
 * [^FlinkTypeFactoryTest.patch]


was (Author: slinkydeveloper):
Per https://issues.apache.org/jira/browse/CALCITE-5046, our issue seems caused 
by a old Calcite bug which has been fixed in the recent releases. The next 
Calcite bump should fix it. To verify, I've attached two patches, one testing 
the plan of an INSERT INTO VALUES query, and the other testing the specific 
type factory method (and also converting the test to java)
 * [^InsertIntoValuesTest.patch]
 * [^FlinkTypeFactoryTest.patch]

> INSERT INTO with VALUES leads to wrong type inference with nested types
> ---
>
> Key: FLINK-26549
> URL: https://issues.apache.org/jira/browse/FLINK-26549
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Francesco Guardiani
>Assignee: Francesco Guardiani
>Priority: Major
> Attachments: FlinkTypeFactoryTest.patch, InsertIntoValuesTest.patch, 
> Least_restrictive_issue.patch
>
>
> While working on casting, I've found out we have an interesting bug in the 
> insert values type inference. This comes from the 
> {{KafkaTableITCase#testKafkaSourceSinkWithMetadata}} (look at this version in 
> particular 
> https://github.com/apache/flink/blob/567440115bcacb5aceaf3304e486281c7da8c14f/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java).
> The test scenario is an INSERT INTO VALUES query which is also pushing some 
> metadata to a Kafka table, in particular is writing the headers metadata.
> The table is declared like that:
> {code:sql}
>  CREATE TABLE kafka (
>   `physical_1` STRING,
>   `physical_2` INT,
>   `timestamp-type` STRING METADATA VIRTUAL,
>   `timestamp` TIMESTAMP(3) METADATA,
>   `leader-epoch` INT METADATA VIRTUAL,
>   `headers` MAP METADATA,
>   `partition` INT METADATA VIRTUAL,
>   `topic` STRING METADATA VIRTUAL,
>   `physical_3` BOOLEAN
> ) WITH (
>'connector' = 'kafka',
>[...]
> )
> {code}
> The insert into query looks like:
> {code:sql}
> INSERT INTO kafka VALUES
> ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', x'C0FFEE', 'k2', 
> x'BABE'], TRUE),
> ('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP BYTES>), FALSE),
> ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2', 
> X'20'], TRUE)
> {code}
> Note that in the first row, the byte literal is of length 3, while in the 
> last row the byte literal is of length 1.
> The generated plan of this INSERT INTO is:
> {code}
> == Abstract Syntax Tree ==
> LogicalSink(table=[default_catalog.default_database.kafka], 
> fields=[physical_1, physical_2, physical_3, headers, timestamp])
> +- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], 
> headers=[CAST($3):(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP], 
> timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
>+- LogicalUnion(all=[true])
>   :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], 
> EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
> X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
>   :  +- LogicalValues(tuples=[[{ 0 }]])
>   :- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], 
> EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], 
> EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP], EXPR$4=[false])
>   :  +- LogicalValues(tuples=[[{ 0 }]])
>   +- LogicalProject(EXPR$0=[_UTF-16LE'data 3'], EXPR$1=[3], 
> EXPR$2=[2020-03-10 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
> X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1))], EXPR$4=[true])
>  +- LogicalValues(tuples=[[{ 0 }]])
> == Optimized Physical Plan ==
> Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
> physical_2, physical_3, headers, timestamp])
> +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
> timestamp])
>:- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS 
> physical_3, CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY

[jira] [Commented] (FLINK-26549) INSERT INTO with VALUES leads to wrong type inference with nested types

2022-03-15 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507003#comment-17507003
 ] 

Francesco Guardiani commented on FLINK-26549:
-

Per https://issues.apache.org/jira/browse/CALCITE-5046, our issue seems caused 
by a old Calcite bug which has been fixed in the recent releases. The next 
Calcite bump should fix it. To verify, I've attached two patches, one testing 
the plan of an INSERT INTO VALUES query, and the other testing the specific 
type factory method (and also converting the test to java)
 * [^InsertIntoValuesTest.patch]
 * [^FlinkTypeFactoryTest.patch]

> INSERT INTO with VALUES leads to wrong type inference with nested types
> ---
>
> Key: FLINK-26549
> URL: https://issues.apache.org/jira/browse/FLINK-26549
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Francesco Guardiani
>Assignee: Francesco Guardiani
>Priority: Major
> Attachments: FlinkTypeFactoryTest.patch, InsertIntoValuesTest.patch, 
> Least_restrictive_issue.patch
>
>
> While working on casting, I've found out we have an interesting bug in the 
> insert values type inference. This comes from the 
> {{KafkaTableITCase#testKafkaSourceSinkWithMetadata}} (look at this version in 
> particular 
> https://github.com/apache/flink/blob/567440115bcacb5aceaf3304e486281c7da8c14f/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java).
> The test scenario is an INSERT INTO VALUES query which is also pushing some 
> metadata to a Kafka table, in particular is writing the headers metadata.
> The table is declared like that:
> {code:sql}
>  CREATE TABLE kafka (
>   `physical_1` STRING,
>   `physical_2` INT,
>   `timestamp-type` STRING METADATA VIRTUAL,
>   `timestamp` TIMESTAMP(3) METADATA,
>   `leader-epoch` INT METADATA VIRTUAL,
>   `headers` MAP METADATA,
>   `partition` INT METADATA VIRTUAL,
>   `topic` STRING METADATA VIRTUAL,
>   `physical_3` BOOLEAN
> ) WITH (
>'connector' = 'kafka',
>[...]
> )
> {code}
> The insert into query looks like:
> {code:sql}
> INSERT INTO kafka VALUES
> ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', x'C0FFEE', 'k2', 
> x'BABE'], TRUE),
> ('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP BYTES>), FALSE),
> ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2', 
> X'20'], TRUE)
> {code}
> Note that in the first row, the byte literal is of length 3, while in the 
> last row the byte literal is of length 1.
> The generated plan of this INSERT INTO is:
> {code}
> == Abstract Syntax Tree ==
> LogicalSink(table=[default_catalog.default_database.kafka], 
> fields=[physical_1, physical_2, physical_3, headers, timestamp])
> +- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], 
> headers=[CAST($3):(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP], 
> timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
>+- LogicalUnion(all=[true])
>   :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], 
> EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
> X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
>   :  +- LogicalValues(tuples=[[{ 0 }]])
>   :- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], 
> EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], 
> EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP], EXPR$4=[false])
>   :  +- LogicalValues(tuples=[[{ 0 }]])
>   +- LogicalProject(EXPR$0=[_UTF-16LE'data 3'], EXPR$1=[3], 
> EXPR$2=[2020-03-10 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
> X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1))], EXPR$4=[true])
>  +- LogicalValues(tuples=[[{ 0 }]])
> == Optimized Physical Plan ==
> Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
> physical_2, physical_3, headers, timestamp])
> +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
> timestamp])
>:- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS 
> physical_3, CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), 
> _UTF-16LE'k2', X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT 
> NULL, BINARY(1) NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE", VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 
> 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>:  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
>:- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS 
> physical_3, null:(VARCHAR(2147483647) CHAR

[jira] [Updated] (FLINK-26549) INSERT INTO with VALUES leads to wrong type inference with nested types

2022-03-15 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani updated FLINK-26549:

Attachment: FlinkTypeFactoryTest.patch

> INSERT INTO with VALUES leads to wrong type inference with nested types
> ---
>
> Key: FLINK-26549
> URL: https://issues.apache.org/jira/browse/FLINK-26549
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Francesco Guardiani
>Assignee: Francesco Guardiani
>Priority: Major
> Attachments: FlinkTypeFactoryTest.patch, InsertIntoValuesTest.patch, 
> Least_restrictive_issue.patch
>
>
> While working on casting, I've found out we have an interesting bug in the 
> insert values type inference. This comes from the 
> {{KafkaTableITCase#testKafkaSourceSinkWithMetadata}} (look at this version in 
> particular 
> https://github.com/apache/flink/blob/567440115bcacb5aceaf3304e486281c7da8c14f/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java).
> The test scenario is an INSERT INTO VALUES query which is also pushing some 
> metadata to a Kafka table, in particular is writing the headers metadata.
> The table is declared like that:
> {code:sql}
>  CREATE TABLE kafka (
>   `physical_1` STRING,
>   `physical_2` INT,
>   `timestamp-type` STRING METADATA VIRTUAL,
>   `timestamp` TIMESTAMP(3) METADATA,
>   `leader-epoch` INT METADATA VIRTUAL,
>   `headers` MAP METADATA,
>   `partition` INT METADATA VIRTUAL,
>   `topic` STRING METADATA VIRTUAL,
>   `physical_3` BOOLEAN
> ) WITH (
>'connector' = 'kafka',
>[...]
> )
> {code}
> The insert into query looks like:
> {code:sql}
> INSERT INTO kafka VALUES
> ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', x'C0FFEE', 'k2', 
> x'BABE'], TRUE),
> ('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP BYTES>), FALSE),
> ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2', 
> X'20'], TRUE)
> {code}
> Note that in the first row, the byte literal is of length 3, while in the 
> last row the byte literal is of length 1.
> The generated plan of this INSERT INTO is:
> {code}
> == Abstract Syntax Tree ==
> LogicalSink(table=[default_catalog.default_database.kafka], 
> fields=[physical_1, physical_2, physical_3, headers, timestamp])
> +- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], 
> headers=[CAST($3):(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP], 
> timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
>+- LogicalUnion(all=[true])
>   :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], 
> EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
> X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
>   :  +- LogicalValues(tuples=[[{ 0 }]])
>   :- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], 
> EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], 
> EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP], EXPR$4=[false])
>   :  +- LogicalValues(tuples=[[{ 0 }]])
>   +- LogicalProject(EXPR$0=[_UTF-16LE'data 3'], EXPR$1=[3], 
> EXPR$2=[2020-03-10 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
> X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1))], EXPR$4=[true])
>  +- LogicalValues(tuples=[[{ 0 }]])
> == Optimized Physical Plan ==
> Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
> physical_2, physical_3, headers, timestamp])
> +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
> timestamp])
>:- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS 
> physical_3, CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), 
> _UTF-16LE'k2', X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT 
> NULL, BINARY(1) NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE", VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 
> 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>:  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
>:- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS 
> physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 
> 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>:  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
>+- Calc(select=[_UTF-16LE'data 3' AS physical_1, 3 AS physical_2, true AS 
> physical_3, CAST(MAP(_UTF-16LE'k1', X'10':BINARY(1), _UTF-16LE'k2', 
> X'20':BINARY(1)) AS (VARCHAR(2147483647) CHA

[jira] [Updated] (FLINK-26549) INSERT INTO with VALUES leads to wrong type inference with nested types

2022-03-15 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani updated FLINK-26549:

Attachment: InsertIntoValuesTest.patch

> INSERT INTO with VALUES leads to wrong type inference with nested types
> ---
>
> Key: FLINK-26549
> URL: https://issues.apache.org/jira/browse/FLINK-26549
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Francesco Guardiani
>Assignee: Francesco Guardiani
>Priority: Major
> Attachments: FlinkTypeFactoryTest.patch, InsertIntoValuesTest.patch, 
> Least_restrictive_issue.patch
>
>
> While working on casting, I've found out we have an interesting bug in the 
> insert values type inference. This comes from the 
> {{KafkaTableITCase#testKafkaSourceSinkWithMetadata}} (look at this version in 
> particular 
> https://github.com/apache/flink/blob/567440115bcacb5aceaf3304e486281c7da8c14f/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java).
> The test scenario is an INSERT INTO VALUES query which is also pushing some 
> metadata to a Kafka table, in particular is writing the headers metadata.
> The table is declared like that:
> {code:sql}
>  CREATE TABLE kafka (
>   `physical_1` STRING,
>   `physical_2` INT,
>   `timestamp-type` STRING METADATA VIRTUAL,
>   `timestamp` TIMESTAMP(3) METADATA,
>   `leader-epoch` INT METADATA VIRTUAL,
>   `headers` MAP METADATA,
>   `partition` INT METADATA VIRTUAL,
>   `topic` STRING METADATA VIRTUAL,
>   `physical_3` BOOLEAN
> ) WITH (
>'connector' = 'kafka',
>[...]
> )
> {code}
> The insert into query looks like:
> {code:sql}
> INSERT INTO kafka VALUES
> ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', x'C0FFEE', 'k2', 
> x'BABE'], TRUE),
> ('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP BYTES>), FALSE),
> ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2', 
> X'20'], TRUE)
> {code}
> Note that in the first row, the byte literal is of length 3, while in the 
> last row the byte literal is of length 1.
> The generated plan of this INSERT INTO is:
> {code}
> == Abstract Syntax Tree ==
> LogicalSink(table=[default_catalog.default_database.kafka], 
> fields=[physical_1, physical_2, physical_3, headers, timestamp])
> +- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], 
> headers=[CAST($3):(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP], 
> timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
>+- LogicalUnion(all=[true])
>   :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], 
> EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
> X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
>   :  +- LogicalValues(tuples=[[{ 0 }]])
>   :- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], 
> EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], 
> EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP], EXPR$4=[false])
>   :  +- LogicalValues(tuples=[[{ 0 }]])
>   +- LogicalProject(EXPR$0=[_UTF-16LE'data 3'], EXPR$1=[3], 
> EXPR$2=[2020-03-10 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
> X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1))], EXPR$4=[true])
>  +- LogicalValues(tuples=[[{ 0 }]])
> == Optimized Physical Plan ==
> Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
> physical_2, physical_3, headers, timestamp])
> +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
> timestamp])
>:- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS 
> physical_3, CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), 
> _UTF-16LE'k2', X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT 
> NULL, BINARY(1) NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE", VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 
> 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>:  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
>:- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS 
> physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 
> 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>:  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
>+- Calc(select=[_UTF-16LE'data 3' AS physical_1, 3 AS physical_2, true AS 
> physical_3, CAST(MAP(_UTF-16LE'k1', X'10':BINARY(1), _UTF-16LE'k2', 
> X'20':BINARY(1)) AS (VARCHAR(2147483647) CHA

[jira] [Updated] (FLINK-26549) INSERT INTO with VALUES leads to wrong type inference with nested types

2022-03-15 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani updated FLINK-26549:

Attachment: Least_restrictive_issue.patch

> INSERT INTO with VALUES leads to wrong type inference with nested types
> ---
>
> Key: FLINK-26549
> URL: https://issues.apache.org/jira/browse/FLINK-26549
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Francesco Guardiani
>Assignee: Francesco Guardiani
>Priority: Major
> Attachments: Least_restrictive_issue.patch
>
>
> While working on casting, I've found out we have an interesting bug in the 
> insert values type inference. This comes from the 
> {{KafkaTableITCase#testKafkaSourceSinkWithMetadata}} (look at this version in 
> particular 
> https://github.com/apache/flink/blob/567440115bcacb5aceaf3304e486281c7da8c14f/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java).
> The test scenario is an INSERT INTO VALUES query which is also pushing some 
> metadata to a Kafka table, in particular is writing the headers metadata.
> The table is declared like that:
> {code:sql}
>  CREATE TABLE kafka (
>   `physical_1` STRING,
>   `physical_2` INT,
>   `timestamp-type` STRING METADATA VIRTUAL,
>   `timestamp` TIMESTAMP(3) METADATA,
>   `leader-epoch` INT METADATA VIRTUAL,
>   `headers` MAP METADATA,
>   `partition` INT METADATA VIRTUAL,
>   `topic` STRING METADATA VIRTUAL,
>   `physical_3` BOOLEAN
> ) WITH (
>'connector' = 'kafka',
>[...]
> )
> {code}
> The insert into query looks like:
> {code:sql}
> INSERT INTO kafka VALUES
> ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', x'C0FFEE', 'k2', 
> x'BABE'], TRUE),
> ('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP BYTES>), FALSE),
> ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2', 
> X'20'], TRUE)
> {code}
> Note that in the first row, the byte literal is of length 3, while in the 
> last row the byte literal is of length 1.
> The generated plan of this INSERT INTO is:
> {code}
> == Abstract Syntax Tree ==
> LogicalSink(table=[default_catalog.default_database.kafka], 
> fields=[physical_1, physical_2, physical_3, headers, timestamp])
> +- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], 
> headers=[CAST($3):(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP], 
> timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
>+- LogicalUnion(all=[true])
>   :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], 
> EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
> X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
>   :  +- LogicalValues(tuples=[[{ 0 }]])
>   :- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], 
> EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], 
> EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP], EXPR$4=[false])
>   :  +- LogicalValues(tuples=[[{ 0 }]])
>   +- LogicalProject(EXPR$0=[_UTF-16LE'data 3'], EXPR$1=[3], 
> EXPR$2=[2020-03-10 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
> X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1))], EXPR$4=[true])
>  +- LogicalValues(tuples=[[{ 0 }]])
> == Optimized Physical Plan ==
> Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
> physical_2, physical_3, headers, timestamp])
> +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
> timestamp])
>:- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS 
> physical_3, CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), 
> _UTF-16LE'k2', X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT 
> NULL, BINARY(1) NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE", VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 
> 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>:  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
>:- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS 
> physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 
> 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>:  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
>+- Calc(select=[_UTF-16LE'data 3' AS physical_1, 3 AS physical_2, true AS 
> physical_3, CAST(MAP(_UTF-16LE'k1', X'10':BINARY(1), _UTF-16LE'k2', 
> X'20':BINARY(1)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP) AS

[jira] [Assigned] (FLINK-26549) INSERT INTO with VALUES leads to wrong type inference with nested types

2022-03-15 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-26549:
---

Assignee: Francesco Guardiani

> INSERT INTO with VALUES leads to wrong type inference with nested types
> ---
>
> Key: FLINK-26549
> URL: https://issues.apache.org/jira/browse/FLINK-26549
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Francesco Guardiani
>Assignee: Francesco Guardiani
>Priority: Major
>
> While working on casting, I've found out we have an interesting bug in the 
> insert values type inference. This comes from the 
> {{KafkaTableITCase#testKafkaSourceSinkWithMetadata}} (look at this version in 
> particular 
> https://github.com/apache/flink/blob/567440115bcacb5aceaf3304e486281c7da8c14f/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java).
> The test scenario is an INSERT INTO VALUES query which is also pushing some 
> metadata to a Kafka table, in particular is writing the headers metadata.
> The table is declared like that:
> {code:sql}
>  CREATE TABLE kafka (
>   `physical_1` STRING,
>   `physical_2` INT,
>   `timestamp-type` STRING METADATA VIRTUAL,
>   `timestamp` TIMESTAMP(3) METADATA,
>   `leader-epoch` INT METADATA VIRTUAL,
>   `headers` MAP METADATA,
>   `partition` INT METADATA VIRTUAL,
>   `topic` STRING METADATA VIRTUAL,
>   `physical_3` BOOLEAN
> ) WITH (
>'connector' = 'kafka',
>[...]
> )
> {code}
> The insert into query looks like:
> {code:sql}
> INSERT INTO kafka VALUES
> ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', x'C0FFEE', 'k2', 
> x'BABE'], TRUE),
> ('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP BYTES>), FALSE),
> ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2', 
> X'20'], TRUE)
> {code}
> Note that in the first row, the byte literal is of length 3, while in the 
> last row the byte literal is of length 1.
> The generated plan of this INSERT INTO is:
> {code}
> == Abstract Syntax Tree ==
> LogicalSink(table=[default_catalog.default_database.kafka], 
> fields=[physical_1, physical_2, physical_3, headers, timestamp])
> +- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], 
> headers=[CAST($3):(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP], 
> timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
>+- LogicalUnion(all=[true])
>   :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], 
> EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
> X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
>   :  +- LogicalValues(tuples=[[{ 0 }]])
>   :- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], 
> EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], 
> EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP], EXPR$4=[false])
>   :  +- LogicalValues(tuples=[[{ 0 }]])
>   +- LogicalProject(EXPR$0=[_UTF-16LE'data 3'], EXPR$1=[3], 
> EXPR$2=[2020-03-10 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
> X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1))], EXPR$4=[true])
>  +- LogicalValues(tuples=[[{ 0 }]])
> == Optimized Physical Plan ==
> Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
> physical_2, physical_3, headers, timestamp])
> +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
> timestamp])
>:- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS 
> physical_3, CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), 
> _UTF-16LE'k2', X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT 
> NULL, BINARY(1) NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE", VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 
> 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>:  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
>:- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS 
> physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 
> 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>:  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
>+- Calc(select=[_UTF-16LE'data 3' AS physical_1, 3 AS physical_2, true AS 
> physical_3, CAST(MAP(_UTF-16LE'k1', X'10':BINARY(1), _UTF-16LE'k2', 
> X'20':BINARY(1)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
> VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-10 
> 12:12:11.123:TIMESTAMP_WITH_LOC

[jira] [Assigned] (FLINK-26467) Compile RowDataToStringConverter lazily

2022-03-14 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-26467:
---

Assignee: Francesco Guardiani

> Compile RowDataToStringConverter lazily
> ---
>
> Key: FLINK-26467
> URL: https://issues.apache.org/jira/browse/FLINK-26467
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Assignee: Francesco Guardiani
>Priority: Major
>
> Currently, we prepare for `print()` whenever `sqlQuery` is called. However, 
> we could postpone the compilation until it is really needed.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-24422) When creating a TIMESTAMP_LTZ only allow Instant

2022-03-14 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-24422:
---

Assignee: (was: Francesco Guardiani)

> When creating a TIMESTAMP_LTZ only allow Instant
> 
>
> Key: FLINK-24422
> URL: https://issues.apache.org/jira/browse/FLINK-24422
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Marios Trivyzas
>Priority: Major
>
> Currently we also accept *LocalDateTime* objects:
> *.from(TIMESTAMP_LTZ(), 
> LocalDateTime.parse("2021-09-24T12:34:56.123456")).resultsIn(LocalDate.of(2021,
>  9, 24))*



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26582) Run the assertj conversion script to convert assertions in flink-table

2022-03-10 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26582:
---

 Summary: Run the assertj conversion script to convert assertions 
in flink-table
 Key: FLINK-26582
 URL: https://issues.apache.org/jira/browse/FLINK-26582
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-25972) org.apache.calcite.rel.metadata.JaninoRelMetadataProvider#HANDLERS might cause class leaks

2022-03-10 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani closed FLINK-25972.
---
Resolution: Not A Problem

> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider#HANDLERS might 
> cause class leaks
> --
>
> Key: FLINK-25972
> URL: https://issues.apache.org/jira/browse/FLINK-25972
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0
>Reporter: Yun Gao
>Priority: Major
>
> JaninoRelMetadataProvider -> static field HANDLERS -> generated class 
> GeneratedMetadataHandler_ColumnNullCount loaded by 
> org.codehaus.janino.ByteArrayClassLoader. This cache seems to use the strong 
> references.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-25968) Possible class leaks in flink-table / sql modules

2022-03-10 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani closed FLINK-25968.
---
Release Note: The leaks have been removed
  Resolution: Fixed

> Possible class leaks in flink-table / sql modules
> -
>
> Key: FLINK-25968
> URL: https://issues.apache.org/jira/browse/FLINK-25968
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Table SQL / Runtime
>Affects Versions: 1.15.0
>Reporter: Yun Gao
>Assignee: Francesco Guardiani
>Priority: Blocker
> Fix For: 1.15.0
>
>
> This is the umbrella issues for possible class leaks in flink-table / sql 
> planner and runtimes.
> Currently for a flink cluster, the flink classes are loaded by the system 
> ClassLoader while each job would have separate user ClassLoaders. In this 
> case, if some class loaded by the system ClassLoader has static variables 
> that reference the classes loaded by the user  ClassLoaders, the user 
> ClassLoaders would not be able to be released, which might cause class leak 
> in some way. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26553) Enable scalafmt for scala codebase

2022-03-09 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26553:
---

 Summary: Enable scalafmt for scala codebase
 Key: FLINK-26553
 URL: https://issues.apache.org/jira/browse/FLINK-26553
 Project: Flink
  Issue Type: Bug
  Components: Build System
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


As discussed in 
https://lists.apache.org/thread/97398pc9cb8y922xlb6mzlsbjtjf5jnv, we should 
enable scalafmt in our codebase



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-26551) Make the legacy behavior disabled by default

2022-03-09 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-26551:
---

Assignee: Francesco Guardiani

> Make the legacy behavior disabled by default
> 
>
> Key: FLINK-26551
> URL: https://issues.apache.org/jira/browse/FLINK-26551
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Francesco Guardiani
>Assignee: Francesco Guardiani
>Priority: Major
>
> Followup of https://issues.apache.org/jira/browse/FLINK-25111
> For the discussion, see 
> https://lists.apache.org/thread/r13y3plwwyg3sngh8cz47flogq621txv



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26551) Make the legacy behavior disabled by default

2022-03-09 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26551:
---

 Summary: Make the legacy behavior disabled by default
 Key: FLINK-26551
 URL: https://issues.apache.org/jira/browse/FLINK-26551
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani


Followup of https://issues.apache.org/jira/browse/FLINK-25111

For the discussion, see 
https://lists.apache.org/thread/r13y3plwwyg3sngh8cz47flogq621txv



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26549) INSERT INTO with VALUES leads to wrong type inference with nested types

2022-03-09 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17503513#comment-17503513
 ] 

Francesco Guardiani commented on FLINK-26549:
-

Another interesting situation, If I modify the map of the third row as 
following:

{code:sql}
INSERT INTO kafka VALUES
('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', x'C0FFEE', 'k2', 
x'BABE'], TRUE),
('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP), FALSE),
('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'102030', 'k2', 
X'203040'], TRUE)
{code}


I get the following plan:

{code}
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], 
headers=[CAST($3):(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP], 
timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
   +- LogicalUnion(all=[true])
  :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], 
EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
  :  +- LogicalValues(tuples=[[{ 0 }]])
  :- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], 
EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], 
EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP], EXPR$4=[false])
  :  +- LogicalValues(tuples=[[{ 0 }]])
  +- LogicalProject(EXPR$0=[_UTF-16LE'data 3'], EXPR$1=[3], 
EXPR$2=[2020-03-10 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
X'102030':BINARY(3), _UTF-16LE'k2', X'203040':BINARY(3))], EXPR$4=[true])
 +- LogicalValues(tuples=[[{ 0 }]])

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
timestamp])
   :- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS 
physical_3, CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', 
X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT NULL, BINARY(3) 
NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
   :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
   :- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS 
physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
   :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
   +- Calc(select=[_UTF-16LE'data 3' AS physical_1, 3 AS physical_2, true AS 
physical_3, CAST(MAP(_UTF-16LE'k1', X'102030':BINARY(3), _UTF-16LE'k2', 
X'203040':BINARY(3)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-10 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
timestamp])
   :- Calc(select=['data 1' AS physical_1, 1 AS physical_2, true AS physical_3, 
CAST(CAST(MAP('k1', X'c0ffee', 'k2', X'babe') AS (CHAR(2), BINARY(3)) MAP) AS 
(VARCHAR(2147483647), VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 
12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
   :  +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
   :- Calc(select=['data 2' AS physical_1, 2 AS physical_2, false AS 
physical_3, null:(VARCHAR(2147483647), VARBINARY(2147483647)) MAP AS headers, 
CAST(2020-03-09 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS 
timestamp])
   :  +- Reused(reference_id=[1])
   +- Calc(select=['data 3' AS physical_1, 3 AS physical_2, true AS physical_3, 
CAST(MAP('k1', X'102030', 'k2', X'203040') AS (VARCHAR(2147483647), 
VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-10 12:12:11.123 AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
  +- Reused(reference_id=[1])
{code}

Which is again wrong, as it's now forcing cast to {{BINARY(3)}}, leading the 
value {{x'BABE'}} to be padded

> INSERT INTO with VALUES leads to wrong type inference with nested types
> ---
>
> Key: FLINK-26549
> URL: https://issues.apache.org/jira/brow

[jira] [Updated] (FLINK-26549) INSERT INTO with VALUES leads to wrong type inference with nested types

2022-03-09 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani updated FLINK-26549:

Description: 
While working on casting, I've found out we have an interesting bug in the 
insert values type inference. This comes from the 
{{KafkaTableITCase#testKafkaSourceSinkWithMetadata}} (look at this version in 
particular 
https://github.com/apache/flink/blob/567440115bcacb5aceaf3304e486281c7da8c14f/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java).

The test scenario is an INSERT INTO VALUES query which is also pushing some 
metadata to a Kafka table, in particular is writing the headers metadata.

The table is declared like that:

{code:sql}
 CREATE TABLE kafka (
  `physical_1` STRING,
  `physical_2` INT,
  `timestamp-type` STRING METADATA VIRTUAL,
  `timestamp` TIMESTAMP(3) METADATA,
  `leader-epoch` INT METADATA VIRTUAL,
  `headers` MAP METADATA,
  `partition` INT METADATA VIRTUAL,
  `topic` STRING METADATA VIRTUAL,
  `physical_3` BOOLEAN
) WITH (
   'connector' = 'kafka',
   [...]
)
{code}

The insert into query looks like:

{code:sql}
INSERT INTO kafka VALUES
('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', x'C0FFEE', 'k2', 
x'BABE'], TRUE),
('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP), FALSE),
('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2', 
X'20'], TRUE)
{code}

Note that in the first row, the byte literal is of length 3, while in the last 
row the byte literal is of length 1.

The generated plan of this INSERT INTO is:

{code}
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], 
headers=[CAST($3):(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP], 
timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
   +- LogicalUnion(all=[true])
  :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], 
EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
  :  +- LogicalValues(tuples=[[{ 0 }]])
  :- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], 
EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], 
EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP], EXPR$4=[false])
  :  +- LogicalValues(tuples=[[{ 0 }]])
  +- LogicalProject(EXPR$0=[_UTF-16LE'data 3'], EXPR$1=[3], 
EXPR$2=[2020-03-10 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1))], EXPR$4=[true])
 +- LogicalValues(tuples=[[{ 0 }]])

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
timestamp])
   :- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS 
physical_3, CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', 
X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT NULL, BINARY(1) 
NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
   :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
   :- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS 
physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
   :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
   +- Calc(select=[_UTF-16LE'data 3' AS physical_1, 3 AS physical_2, true AS 
physical_3, CAST(MAP(_UTF-16LE'k1', X'10':BINARY(1), _UTF-16LE'k2', 
X'20':BINARY(1)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-10 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
timestamp])
   :- Calc(select=['data 1' AS physical_1, 1 AS physical_2, true AS physical_3, 
CAST(CAST(MAP('k1', X'c0ffee', 'k2', X'babe') AS (CHAR(2), BINARY(1)) MAP) AS 
(VARCHAR(2147483647), VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 
12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(

[jira] [Created] (FLINK-26549) INSERT INTO with VALUES leads to wrong type inference with nested types

2022-03-09 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26549:
---

 Summary: INSERT INTO with VALUES leads to wrong type inference 
with nested types
 Key: FLINK-26549
 URL: https://issues.apache.org/jira/browse/FLINK-26549
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Francesco Guardiani


While working on casting, I've found out we have an interesting bug in the 
insert values type inference. This comes from the 
{{KafkaTableITCase#testKafkaSourceSinkWithMetadata}} (look at this version in 
particular 
https://github.com/apache/flink/blob/567440115bcacb5aceaf3304e486281c7da8c14f/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java).

The test scenario is an INSERT INTO VALUES query which is also pushing some 
metadata to a Kafka table, in particular is writing the headers metadata.

The table is declared like that:

{code:sql}
 CREATE TABLE kafka (
  `physical_1` STRING,
  `physical_2` INT,
  `timestamp-type` STRING METADATA VIRTUAL,
  `timestamp` TIMESTAMP(3) METADATA,
  `leader-epoch` INT METADATA VIRTUAL,
  `headers` MAP METADATA,
  `partition` INT METADATA VIRTUAL,
  `topic` STRING METADATA VIRTUAL,
  `physical_3` BOOLEAN
) WITH (
   'connector' = 'kafka',
   [...]
)
{code}

The insert into query looks like:

{code:sql}
INSERT INTO kafka VALUES
('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', x'C0FFEE', 'k2', 
x'BABE'], TRUE),
('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP), FALSE),
('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2', 
X'20'], TRUE)
{code}

Note that in the first row, the byte literal is of length 3, while in the last 
row the byte literal is of length 1.

The generated plan of this INSERT INTO is:

{code}
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], 
headers=[CAST($3):(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP], 
timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
   +- LogicalUnion(all=[true])
  :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], 
EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
  :  +- LogicalValues(tuples=[[{ 0 }]])
  :- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], 
EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], 
EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP], EXPR$4=[false])
  :  +- LogicalValues(tuples=[[{ 0 }]])
  +- LogicalProject(EXPR$0=[_UTF-16LE'data 3'], EXPR$1=[3], 
EXPR$2=[2020-03-10 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1))], EXPR$4=[true])
 +- LogicalValues(tuples=[[{ 0 }]])

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
timestamp])
   :- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS 
physical_3, CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', 
X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT NULL, BINARY(1) 
NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
   :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
   :- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS 
physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
   :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
   +- Calc(select=[_UTF-16LE'data 3' AS physical_1, 3 AS physical_2, true AS 
physical_3, CAST(MAP(_UTF-16LE'k1', X'10':BINARY(1), _UTF-16LE'k2', 
X'20':BINARY(1)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-10 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
timestamp])
   :- Calc(select=['data 1' AS physical_1, 1 AS physical_2, true AS physical_3, 
CAST(CAS

[jira] [Created] (FLINK-26520) Implement SEARCH operator

2022-03-07 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26520:
---

 Summary: Implement SEARCH operator
 Key: FLINK-26520
 URL: https://issues.apache.org/jira/browse/FLINK-26520
 Project: Flink
  Issue Type: Technical Debt
  Components: Table SQL / Planner
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


The codegen right now is not implementing the SEARCH operator, but it's using 
the rex builder to circumvent it. We should implement the SEARCH operator 
directly, to remove the usage of the flink type factory



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26519) Remove FlinkTypeFactory.INSTANCE singleton

2022-03-07 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani updated FLINK-26519:

Issue Type: Technical Debt  (was: Bug)

> Remove FlinkTypeFactory.INSTANCE singleton
> --
>
> Key: FLINK-26519
> URL: https://issues.apache.org/jira/browse/FLINK-26519
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Planner
>Reporter: Francesco Guardiani
>Assignee: Francesco Guardiani
>Priority: Major
>
> This singleton is not correct, as the user classloader needs to be passed for 
> structured types creation



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26519) Remove FlinkTypeFactory.INSTANCE singleton

2022-03-07 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26519:
---

 Summary: Remove FlinkTypeFactory.INSTANCE singleton
 Key: FLINK-26519
 URL: https://issues.apache.org/jira/browse/FLINK-26519
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


This singleton is not correct, as the user classloader needs to be passed for 
structured types creation



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-25971) org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil#OBJECT_MAPPER_INSTANCE might cause class leaks

2022-03-07 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani closed FLINK-25971.
---
Resolution: Not A Problem

> org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil#OBJECT_MAPPER_INSTANCE
>  might cause class leaks
> -
>
> Key: FLINK-25971
> URL: https://issues.apache.org/jira/browse/FLINK-25971
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Yun Gao
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
>
> It might need a double check if 
> JsonSerdeUtil#OBJECT_MAPPER_INSTANCE -> DefaultDeserializationContext 
> _deserializationContext -> extends DeserializationContext -> 
> DeserializerCache _cache -> _cachedDeserializers / _incompleteDeserializers 
> might hold user ClassLoaders



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-25972) org.apache.calcite.rel.metadata.JaninoRelMetadataProvider#HANDLERS might cause class leaks

2022-03-03 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17500666#comment-17500666
 ] 

Francesco Guardiani edited comment on FLINK-25972 at 3/3/22, 11:15 AM:
---

This seems to be fine, look at the method {{JaninoRelMetadataHandler#compile}}: 
The generated class always generates a classloader (this is the way the 
{{SimpleCompiler}} from Janino works), and this classloader will use 
{{JaninoRexCompiler.class.getClassLoader()}} as parent classloader, which 
should be the {{AppClassLoader}}. So I don't see the leak here. Does it makes 
sense? Also [~twalthr] 


was (Author: slinkydeveloper):
This seems to be fine, look at the method {{JaninoRelMetadataHandler#compile}}: 
The generated class always generates a classloader, and this classloader will 
use {{JaninoRexCompiler.class.getClassLoader()}} as parent classloader, which 
should be the {{AppClassLoader}}. So I don't see the leak here. Does it makes 
sense? Also [~twalthr] 

> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider#HANDLERS might 
> cause class leaks
> --
>
> Key: FLINK-25972
> URL: https://issues.apache.org/jira/browse/FLINK-25972
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0
>Reporter: Yun Gao
>Priority: Major
>
> JaninoRelMetadataProvider -> static field HANDLERS -> generated class 
> GeneratedMetadataHandler_ColumnNullCount loaded by 
> org.codehaus.janino.ByteArrayClassLoader. This cache seems to use the strong 
> references.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25972) org.apache.calcite.rel.metadata.JaninoRelMetadataProvider#HANDLERS might cause class leaks

2022-03-03 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17500666#comment-17500666
 ] 

Francesco Guardiani commented on FLINK-25972:
-

This seems to be fine, look at the method {{JaninoRelMetadataHandler#compile}}: 
The generated class always generates a classloader, and this classloader will 
use {{JaninoRexCompiler.class.getClassLoader()}} as parent classloader, which 
should be the {{AppClassLoader}}. So I don't see the leak here. Does it makes 
sense? Also [~twalthr] 

> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider#HANDLERS might 
> cause class leaks
> --
>
> Key: FLINK-25972
> URL: https://issues.apache.org/jira/browse/FLINK-25972
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0
>Reporter: Yun Gao
>Priority: Major
>
> JaninoRelMetadataProvider -> static field HANDLERS -> generated class 
> GeneratedMetadataHandler_ColumnNullCount loaded by 
> org.codehaus.janino.ByteArrayClassLoader. This cache seems to use the strong 
> references.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26463) TableEnvironmentITCase should use MiniCluster

2022-03-03 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26463:
---

 Summary: TableEnvironmentITCase should use MiniCluster
 Key: FLINK-26463
 URL: https://issues.apache.org/jira/browse/FLINK-26463
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-26422) Update Chinese documentation with the new TablePipeline docs

2022-03-02 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-26422:
---

Assignee: zoucao

> Update Chinese documentation with the new TablePipeline docs
> 
>
> Key: FLINK-26422
> URL: https://issues.apache.org/jira/browse/FLINK-26422
> Project: Flink
>  Issue Type: Bug
>  Components: chinese-translation, Documentation
>Reporter: Francesco Guardiani
>Assignee: zoucao
>Priority: Major
>  Labels: chinese-translation
>
> Chinese docs needs to be updated with the content of this commit: 
> https://github.com/apache/flink/commit/4f65c7950f2c3ef849f2094deab0e199ffedf57b



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26422) Update Chinese documentation with the new TablePipeline docs

2022-03-01 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani updated FLINK-26422:

Component/s: chinese-translation

> Update Chinese documentation with the new TablePipeline docs
> 
>
> Key: FLINK-26422
> URL: https://issues.apache.org/jira/browse/FLINK-26422
> Project: Flink
>  Issue Type: Bug
>  Components: chinese-translation, Documentation
>Reporter: Francesco Guardiani
>Priority: Major
>  Labels: chinese-translation
>
> Chinese docs needs to be updated with the content of this commit: 
> https://github.com/apache/flink/commit/4f65c7950f2c3ef849f2094deab0e199ffedf57b



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25416) Build unified Parquet BulkFormat for both Table API and DataStream API

2022-03-01 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani updated FLINK-25416:

Component/s: Formats (JSON, Avro, Parquet, ORC, SequenceFile)

> Build unified Parquet BulkFormat for both Table API and DataStream API
> --
>
> Key: FLINK-25416
> URL: https://issues.apache.org/jira/browse/FLINK-25416
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Jing Ge
>Assignee: Jing Ge
>Priority: Major
>
> *Background information*
> Current AvroParquet implementation AvroParquetRecordFormat uses the high 
> level API ParquetReader that does not provide offset information, which turns 
> out the restoreReader logic has big room to improve.
> Beyond AvroParquetRecordFormat there is another format implementation 
> ParquetVectorizedInputFormat w.r.t. the parquet which is coupled tightly with 
> the Table API.
> It would be better to provide an unified Parquet BulkFormat with one 
> implementation that can support both Table API and DataStream API.
>  
> *Some thoughts*
> Use the low level API {{ParquetFileReader}} with {{BulkFormat}} directly like 
> 'ParquetVectorizedInputFormat' did instead of with {{StreamFormat}} for the 
> following reasons:
>  * the read logic is built in the internal low level class 
> {{InternalParquetRecordReader}} with package private visibility in 
> parquet-hadoop lib which uses another low level class {{ParquetFileReader}} 
> internally. This makes the implementation of StreamFormat very complicated. I 
> think the design idea of StreamFormat is to simplify the implementation. They 
> do not seem to work together.
>  * {{{}ParquetFileReader{}}}reads data in batch mode, i.e. {{{}PageReadStore 
> pages = reader.readNextFilteredRowGroup();{}}}. If we build these logic into 
> StreamFormat({{{}AvroParquetRecordFormat{}}} in this case), 
> {{AvroParquetRecordFormat}} has to take over the role 
> {{InternalParquetRecordReader}} does, including but not limited to
>  ## read {{PageReadStore}} in batch mode.
>  ## manage {{{}PageReadStore{}}}, i.e. read next page when all records in the 
> current page have been consumed and cache it.
>  ## manage the read index within the current {{PageReadStore}} because 
> StreamFormat has its own setting for read size, etc.
> All of these make {{AvroParquetRecordFormat}} become the {{BulkFormat}} 
> instead of {{StreamFormat}}
>  * {{StreamFormat}} can only be used via {{{}StreamFormatAdapter{}}}, which 
> means everything we will do with the low level APIs for parquet-hadoop lib 
> should have no conflict with the built-in logic provided by 
> {{{}StreamFormatAdapter{}}}.
> Now we could see if we build these logics into a {{StreamFormat}} 
> implementation, i.e. {{{}AvroParquetRecordFormat{}}}, all convenient built-in 
> logic provided by the {{StreamFormatAdapter}} turns into obstacles. There is 
> also a violation of single responsibility principle, i.e. 
> {{AvroParquetRecordFormat }}will take some responsibility of 
> {{{}BulkFormat{}}}. These might be the reasons why 
> 'ParquetVectorizedInputFormat' implemented {{BulkFormat}} instead of 
> {{{}StreamFormat{}}}.
> In order to build a unified parquet implementation for both Table API and 
> DataStream API, it makes more sense to consider building these code into a 
> {{BulkFormat}} implementation class. Since the output data types are 
> different, {{RowData}} vs. {{{}Avro{}}}, extra converter logic should be 
> introduced into the architecture design. Depending on how complicated the 
> issue will be and how big the impact it will have on the current code base, a 
> new FLIP might be required. 
> Following code piece were suggested by Arvid Heise for the next optimized 
> AvroParquetReader:
> {code:java}
> // Injected
> GenericData model = GenericData.get();
> org.apache.hadoop.conf.Configuration conf = new 
> org.apache.hadoop.conf.Configuration();
> // Low level reader - fetch metadata
> ParquetFileReader reader = null;
> MessageType fileSchema = reader.getFileMetaData().getSchema();
> Map metaData = 
> reader.getFileMetaData().getKeyValueMetaData();
> // init Avro specific things
> AvroReadSupport readSupport = new AvroReadSupport<>(model);
> ReadSupport.ReadContext readContext =
> readSupport.init(
> new InitContext(
>   conf,
> metaData.entrySet().stream()
> .collect(Collectors.toMap(e -> 
> e.getKey(), e -> Collections.singleton(e.getValue(,
>   

[jira] [Created] (FLINK-26422) Update Chinese documentation with the new TablePipeline docs

2022-03-01 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26422:
---

 Summary: Update Chinese documentation with the new TablePipeline 
docs
 Key: FLINK-26422
 URL: https://issues.apache.org/jira/browse/FLINK-26422
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: Francesco Guardiani


Chinese docs needs to be updated with the content of this commit: 
https://github.com/apache/flink/commit/4f65c7950f2c3ef849f2094deab0e199ffedf57b



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-26131) CompiledPlan should implement Executable

2022-02-22 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-26131:
---

Assignee: Francesco Guardiani

> CompiledPlan should implement Executable
> 
>
> Key: FLINK-26131
> URL: https://issues.apache.org/jira/browse/FLINK-26131
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Francesco Guardiani
>Assignee: Francesco Guardiani
>Priority: Major
>
> In order to do that, we need to keep a reference of TableEnvironment within 
> the implementation of CompiledPlan. We should also check that the 
> TableEnvironment matches



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-26280) Add a flag to disable uid generation

2022-02-22 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-26280:
---

Assignee: Francesco Guardiani

> Add a flag to disable uid generation
> 
>
> Key: FLINK-26280
> URL: https://issues.apache.org/jira/browse/FLINK-26280
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Francesco Guardiani
>Assignee: Francesco Guardiani
>Priority: Major
>
> We should add a flag to disable uid generation for back-compat. See the 
> discussion here: https://issues.apache.org/jira/browse/FLINK-25932



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-25969) org.apache.flink.table.runtime.generated.CompileUtils might cause class leaks

2022-02-22 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-25969:
---

Assignee: Francesco Guardiani

> org.apache.flink.table.runtime.generated.CompileUtils might cause class leaks
> -
>
> Key: FLINK-25969
> URL: https://issues.apache.org/jira/browse/FLINK-25969
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.15.0
>Reporter: Yun Gao
>Assignee: Francesco Guardiani
>Priority: Major
>
> CompileUtils has two static caches, namely COMPILED_CACHE and 
> COMPILED_EXPRESSION_CACHE. COMPILED_CACHE is check that it might cache the 
> user ClassLoaders with strong references, thus it might need be improved. 
> COMPILED_EXPRESSION_CACHE would need a double check.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-25971) org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil#OBJECT_MAPPER_INSTANCE might cause class leaks

2022-02-22 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-25971:
---

Assignee: Francesco Guardiani

> org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil#OBJECT_MAPPER_INSTANCE
>  might cause class leaks
> -
>
> Key: FLINK-25971
> URL: https://issues.apache.org/jira/browse/FLINK-25971
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Yun Gao
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
>
> It might need a double check if 
> JsonSerdeUtil#OBJECT_MAPPER_INSTANCE -> DefaultDeserializationContext 
> _deserializationContext -> extends DeserializationContext -> 
> DeserializerCache _cache -> _cachedDeserializers / _incompleteDeserializers 
> might hold user ClassLoaders



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-25968) Possible class leaks in flink-table / sql modules

2022-02-22 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-25968:
---

Assignee: Francesco Guardiani  (was: Timo Walther)

> Possible class leaks in flink-table / sql modules
> -
>
> Key: FLINK-25968
> URL: https://issues.apache.org/jira/browse/FLINK-25968
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Table SQL / Runtime
>Affects Versions: 1.15.0
>Reporter: Yun Gao
>Assignee: Francesco Guardiani
>Priority: Blocker
> Fix For: 1.15.0
>
>
> This is the umbrella issues for possible class leaks in flink-table / sql 
> planner and runtimes.
> Currently for a flink cluster, the flink classes are loaded by the system 
> ClassLoader while each job would have separate user ClassLoaders. In this 
> case, if some class loaded by the system ClassLoader has static variables 
> that reference the classes loaded by the user  ClassLoaders, the user 
> ClassLoaders would not be able to be released, which might cause class leak 
> in some way. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24423) Cast from TIME to TIMESTAMP currently uses Epoch Date

2022-02-22 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17496052#comment-17496052
 ] 

Francesco Guardiani commented on FLINK-24423:
-

Ok makes sense, maybe is it worth to be added here? 
https://github.com/apache/flink/pull/18813

> Cast from TIME to TIMESTAMP currently uses Epoch Date
> -
>
> Key: FLINK-24423
> URL: https://issues.apache.org/jira/browse/FLINK-24423
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Marios Trivyzas
>Priority: Major
>
> *.from(TIME(5), 
> LocalTime.parse("12:34:56.1234567")).resultsIn(LocalDateTime.of(1970, 1, 1, 
> 12, 34, 56, 0))*
>  
> PostgreSQL doesn't allow such a cast, whereas Oracle uses the current system 
> Date of the server.
>  
> Should we keep it as is, or zero out the year? so end up with: 
> *LocalDateTime.of(0, 1, 1, 12, 34, 56, 0))*
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25286) Improve connector testing framework to support more scenarios

2022-02-21 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17495611#comment-17495611
 ] 

Francesco Guardiani commented on FLINK-25286:
-

Hi all,
Just seen this now and I was wondering, perhaps this should have been a FLIP, 
given all the APIs are annotated as experimental? Or at least is there a doc 
shared publicly where i can look at the design? Can you also add a README at 
the root of the module, in order to explain the architecture and how this can 
be used?

I have a couple of questions:

* From a quick look, it sounds like this is a completely new injection 
framework. Is there some area where we can reuse the injection provided by 
JUnit 5 instead?
* Can we rely on {{MiniClusterExtension}} for {{MiniCluster}} setup and 
teardown?
* Can the tests run in parallel using the new JUnit 5 parallel execution 
feature?
* We have ArchUnit tests in 
{{flink-architecture-tests/flink-architecture-tests-test}} looking for the 
{{MiniClusterExtension}} or the rule {{MiniClusterResource}}, which now are 
going to fail when you use this new framework (see the pulsar connector). Can 
you please upgrade these rules as well? 

Thank you,
FG

> Improve connector testing framework to support more scenarios
> -
>
> Key: FLINK-25286
> URL: https://issues.apache.org/jira/browse/FLINK-25286
> Project: Flink
>  Issue Type: Improvement
>  Components: Test Infrastructure
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.15.0
>
>
> Currently connector testing framework only support tests for DataStream 
> sources, and available scenarios are quite limited by current interface 
> design. 
> This ticket proposes to made improvements to connector testing framework for 
> supporting more test scenarios, and add test suites for sink and Table/SQL 
> API.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26280) Add a flag to disable uid generation

2022-02-21 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26280:
---

 Summary: Add a flag to disable uid generation
 Key: FLINK-26280
 URL: https://issues.apache.org/jira/browse/FLINK-26280
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Francesco Guardiani


We should add a flag to disable uid generation for back-compat. See the 
discussion here: https://issues.apache.org/jira/browse/FLINK-25932



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25932) Introduce ExecNodeContext.generateUid()

2022-02-21 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17495494#comment-17495494
 ] 

Francesco Guardiani commented on FLINK-25932:
-

Opened an issue: https://issues.apache.org/jira/browse/FLINK-26280

> Introduce ExecNodeContext.generateUid()
> ---
>
> Key: FLINK-25932
> URL: https://issues.apache.org/jira/browse/FLINK-25932
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> FLINK-25387 introduced an {{ExecNodeContext}} which contains all information 
> to generate unique and deterministic identifiers for all created 
> {{Transformation}}.
> This issue includes:
> - Add {{ExecNodeContext.generateUid(operatorName: String): String}}
> - Go through all ExecNodes and give transformations a uid. The name can be 
> constant static field within the ExecNode such that both annotation and 
> method can use it.
> - We give all transformations a uid, including stateless ones.
> - The final UID should look like: 
> {{13_stream-exec-sink_1_upsert-materializer}}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25932) Introduce ExecNodeContext.generateUid()

2022-02-21 Thread Francesco Guardiani (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17495455#comment-17495455
 ] 

Francesco Guardiani commented on FLINK-25932:
-

Hi [~wenlong.lwl], I don't understand how an option would help you. All the 
jobs are already state incompatible these days between minor releases, as the 
plan changes and the json plan. Even if you were using the json plan from 1.14, 
as we modified heavily its shape, you won't be able to load it in 1.15.

Can you be more specific on which use cases, possibly with examples, do you 
have in mind?

> Introduce ExecNodeContext.generateUid()
> ---
>
> Key: FLINK-25932
> URL: https://issues.apache.org/jira/browse/FLINK-25932
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> FLINK-25387 introduced an {{ExecNodeContext}} which contains all information 
> to generate unique and deterministic identifiers for all created 
> {{Transformation}}.
> This issue includes:
> - Add {{ExecNodeContext.generateUid(operatorName: String): String}}
> - Go through all ExecNodes and give transformations a uid. The name can be 
> constant static field within the ExecNode such that both annotation and 
> method can use it.
> - We give all transformations a uid, including stateless ones.
> - The final UID should look like: 
> {{13_stream-exec-sink_1_upsert-materializer}}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26252) Refactor MiniClusterExtension

2022-02-18 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26252:
---

 Summary: Refactor MiniClusterExtension
 Key: FLINK-26252
 URL: https://issues.apache.org/jira/browse/FLINK-26252
 Project: Flink
  Issue Type: Bug
  Components: Test Infrastructure
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26249) Run BuiltInFunctionITCase tests in parallel

2022-02-18 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26249:
---

 Summary: Run BuiltInFunctionITCase tests in parallel
 Key: FLINK-26249
 URL: https://issues.apache.org/jira/browse/FLINK-26249
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner, Test Infrastructure
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Reopened] (FLINK-25129) Update docs to use flink-table-planner-loader instead of flink-table-planner

2022-02-17 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reopened FLINK-25129:
-

I'm reopening as I have a PR to clarify some details, improve wording and add 
more info.

> Update docs to use flink-table-planner-loader instead of flink-table-planner
> 
>
> Key: FLINK-25129
> URL: https://issues.apache.org/jira/browse/FLINK-25129
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Examples, Table SQL / API
>Reporter: Francesco Guardiani
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> For more details 
> https://docs.google.com/document/d/12yDUCnvcwU2mODBKTHQ1xhfOq1ujYUrXltiN_rbhT34/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-26125) Doc overhaul for the CAST behaviour

2022-02-15 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-26125:
---

Assignee: Francesco Guardiani

> Doc overhaul for the CAST behaviour
> ---
>
> Key: FLINK-26125
> URL: https://issues.apache.org/jira/browse/FLINK-26125
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table SQL / API
>Reporter: Francesco Guardiani
>Assignee: Francesco Guardiani
>Priority: Major
>
> This includes:
> * Proper documentation of the new TRY_CAST
> * Add a CAST matrix to document which CAST tuples are supported



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-26053) Fix parser generator warnings

2022-02-14 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-26053:
---

Assignee: Francesco Guardiani

> Fix parser generator warnings
> -
>
> Key: FLINK-26053
> URL: https://issues.apache.org/jira/browse/FLINK-26053
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Francesco Guardiani
>Assignee: Francesco Guardiani
>Priority: Major
>
> When building flink-sql-parser, the javacc logs a couple of warnings:
> {code}
> Warning: Choice conflict in [...] construct at line 2920, column 5.
>  Expansion nested within construct and expansion following construct
>  have common prefixes, one of which is: "PLAN"
>  Consider using a lookahead of 2 or more for nested expansion.
> Warning: Choice conflict involving two expansions at
>  line 2930, column 9 and line 2932, column 9 respectively.
>  A common prefix is: "STATEMENT"
>  Consider using a lookahead of 2 for earlier expansion.
> Warning: Choice conflict involving two expansions at
>  line 2952, column 9 and line 2954, column 9 respectively.
>  A common prefix is: "STATEMENT"
>  Consider using a lookahead of 2 for earlier expansion.
> {code}
> Took from 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30871&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4&l=3911
> We should investigate them, as they're often symptom of some bug in our 
> parser template, which might result in unexpected parsing errors.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26131) CompiledPlan should implement Executable

2022-02-14 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26131:
---

 Summary: CompiledPlan should implement Executable
 Key: FLINK-26131
 URL: https://issues.apache.org/jira/browse/FLINK-26131
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Francesco Guardiani


In order to do that, we need to keep a reference of TableEnvironment within the 
implementation of CompiledPlan. We should also check that the TableEnvironment 
matches



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-26127) Cleanup usage of deprecated table methods in doc

2022-02-14 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani reassigned FLINK-26127:
---

Assignee: (was: Francesco Guardiani)

> Cleanup usage of deprecated table methods in doc
> 
>
> Key: FLINK-26127
> URL: https://issues.apache.org/jira/browse/FLINK-26127
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Table SQL / API
>Reporter: Francesco Guardiani
>Priority: Major
>
> In our docs we have a lot of deprecated methods usage in various examples. We 
> should clean them up, and replace with new methods usage



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26127) Cleanup usage of deprecated table methods in doc

2022-02-14 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26127:
---

 Summary: Cleanup usage of deprecated table methods in doc
 Key: FLINK-26127
 URL: https://issues.apache.org/jira/browse/FLINK-26127
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Table SQL / API
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


In our docs we have a lot of deprecated methods usage in various examples. We 
should clean them up, and replace with new methods usage



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26125) Doc overhaul for the CAST behaviour

2022-02-14 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani updated FLINK-26125:

Component/s: Documentation
 Table SQL / API

> Doc overhaul for the CAST behaviour
> ---
>
> Key: FLINK-26125
> URL: https://issues.apache.org/jira/browse/FLINK-26125
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table SQL / API
>Reporter: Francesco Guardiani
>Priority: Major
>
> This includes:
> * Proper documentation of the new TRY_CAST
> * Add a CAST matrix to document which CAST tuples are supported



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26125) Doc overhaul for the CAST behaviour

2022-02-14 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26125:
---

 Summary: Doc overhaul for the CAST behaviour
 Key: FLINK-26125
 URL: https://issues.apache.org/jira/browse/FLINK-26125
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani


This includes:

* Proper documentation of the new TRY_CAST
* Add a CAST matrix to document which CAST tuples are supported



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-25942) Use jackson jdk8/time modules for Duration ser/de

2022-02-11 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani closed FLINK-25942.
---

> Use jackson jdk8/time modules for Duration ser/de
> -
>
> Key: FLINK-25942
> URL: https://issues.apache.org/jira/browse/FLINK-25942
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0
>Reporter: Francesco Guardiani
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> https://issues.apache.org/jira/browse/FLINK-25588 introduced jackson jdk8 and 
> datetime modules to flink-shaded jackson, so now we don't need our Duration 
> ser/de anymore (which was introduced because we lacked this jackson module).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-25844) Expose plan via StatementSet.compilePlan

2022-02-11 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani resolved FLINK-25844.
-
Resolution: Fixed

Merged in 056dd0fcfe5e4c5ba40cdea308f173b9298affdb

> Expose plan via StatementSet.compilePlan
> 
>
> Key: FLINK-25844
> URL: https://issues.apache.org/jira/browse/FLINK-25844
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489#FLIP190:SupportVersionUpgradesforTableAPI&SQLPrograms-StatementSets
> It should be marked as {{@Experimental}}. We should check whether 
> {{StreamStatementSet}} throws a helpful exception for DataStreams that we 
> don't support yet.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-25844) Expose plan via StatementSet.compilePlan

2022-02-11 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani closed FLINK-25844.
---

> Expose plan via StatementSet.compilePlan
> 
>
> Key: FLINK-25844
> URL: https://issues.apache.org/jira/browse/FLINK-25844
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489#FLIP190:SupportVersionUpgradesforTableAPI&SQLPrograms-StatementSets
> It should be marked as {{@Experimental}}. We should check whether 
> {{StreamStatementSet}} throws a helpful exception for DataStreams that we 
> don't support yet.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25844) Expose plan via StatementSet.compilePlan

2022-02-11 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani updated FLINK-25844:

Fix Version/s: 1.15.0

> Expose plan via StatementSet.compilePlan
> 
>
> Key: FLINK-25844
> URL: https://issues.apache.org/jira/browse/FLINK-25844
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489#FLIP190:SupportVersionUpgradesforTableAPI&SQLPrograms-StatementSets
> It should be marked as {{@Experimental}}. We should check whether 
> {{StreamStatementSet}} throws a helpful exception for DataStreams that we 
> don't support yet.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26090) Remove pre FLIP-84 methods

2022-02-11 Thread Francesco Guardiani (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani updated FLINK-26090:

Component/s: Table SQL / API

> Remove pre FLIP-84 methods
> --
>
> Key: FLINK-26090
> URL: https://issues.apache.org/jira/browse/FLINK-26090
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Francesco Guardiani
>Assignee: Francesco Guardiani
>Priority: Major
>
> See 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878 
> and https://issues.apache.org/jira/browse/FLINK-16364.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


  1   2   3   4   >