[jira] [Created] (FLINK-27201) Remove CollectTableSink

2022-04-12 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-27201:
---

 Summary: Remove CollectTableSink
 Key: FLINK-27201
 URL: https://issues.apache.org/jira/browse/FLINK-27201
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27200) Replace CollectionTableSource and CollectionTableSink with new table stack alternatives

2022-04-12 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-27200:
---

 Summary: Replace CollectionTableSource and CollectionTableSink 
with new table stack alternatives
 Key: FLINK-27200
 URL: https://issues.apache.org/jira/browse/FLINK-27200
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani


We need to replace the old CollectionTableSource and CollectionTableSink with 
alternatives from the new stack, for example TestValuesTableFactory or some 
simpler and more straightforward alternative coming from TableFactoryHarness.

This task requires to identify the alternative for the old 
CollectionTableSource and CollectionTableSink, eventually creating a new one, 
and then replace the usage in tests.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27198) Cleanup tests testing legacy TableSink/TableSource

2022-04-12 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-27198:
---

 Summary: Cleanup tests testing legacy TableSink/TableSource
 Key: FLINK-27198
 URL: https://issues.apache.org/jira/browse/FLINK-27198
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani


We have a lot of mock TableSink/TableSource used only for testing features of 
the old TableSink/TableSource stack. We should identify them and remove 
altogether with the tests. A non complete list:


{{Implementations of TableSink in  (25 usages found)}}
{{    Test  (14 usages found)}}
{{        flink-python_2.12  (4 usages found)}}
{{            org.apache.flink.table.legacyutils  (4 usages found)}}
{{                legacyTestingSinks.scala  (3 usages found)}}
{{                    39 private[flink] class TestAppendSink extends 
AppendStreamTableSink[Row] {}}
{{                    69 private[flink] class TestRetractSink extends 
RetractStreamTableSink[Row] {}}
{{                    95 private[flink] class TestUpsertSink(}}
{{                TestCollectionTableFactory.scala  (1 usage found)}}
{{                    168 class CollectionTableSink(val outputType: 
RowTypeInfo)}}
{{        flink-table-common  (1 usage found)}}
{{            org.apache.flink.table.utils  (1 usage found)}}
{{                TypeMappingUtilsTest.java  (1 usage found)}}
{{                    419 private static class TestTableSink implements 
TableSink> {}}
{{        flink-table-planner_2.12  (9 usages found)}}
{{            org.apache.flink.table.planner.factories.utils  (1 usage found)}}
{{                TestCollectionTableFactory.scala  (1 usage found)}}
{{                    152 class CollectionTableSink(val schema: TableSchema)}}
{{            org.apache.flink.table.planner.runtime.batch.sql  (1 usage 
found)}}
{{                PartitionableSinkITCase.scala  (1 usage found)}}
{{                    331 private class TestSink(}}
{{            org.apache.flink.table.planner.runtime.utils  (3 usages found)}}
{{                StreamTestSink.scala  (3 usages found)}}
{{                    265 final class TestingUpsertTableSink(val keys: 
Array[Int], val tz: TimeZone)}}
{{                    344 final class TestingAppendTableSink(tz: TimeZone) 
extends AppendStreamTableSink[Row] {}}
{{                    499 final class TestingRetractTableSink(tz: TimeZone) 
extends RetractStreamTableSink[Row] {}}
{{            org.apache.flink.table.planner.utils  (4 usages found)}}
{{                MemoryTableSourceSinkUtil.scala  (3 usages found)}}
{{                    80 final class UnsafeMemoryAppendTableSink}}
{{                    146 final class DataTypeOutputFormatTableSink(}}
{{                    181 final class DataTypeAppendStreamTableSink(}}
{{                testTableSourceSinks.scala  (1 usage found)}}
{{                    1317 class OptionsTableSink(}}

{{Implementations of TableSource in  (31 usages found)}}
{{    Production  (8 usages found)}}
{{        flink-batch-sql-test  (1 usage found)}}
{{            org.apache.flink.sql.tests  (1 usage found)}}
{{                BatchSQLTestProgram.java  (1 usage found)}}
{{                    79 public static class GeneratorTableSource extends 
InputFormatTableSource {}}
{{        flink-stream-sql-test_2.12  (1 usage found)}}
{{            org.apache.flink.sql.tests  (1 usage found)}}
{{                StreamSQLTestProgram.java  (1 usage found)}}
{{                    192 public static class GeneratorTableSource}}
{{    Test  (23 usages found)}}
{{        flink-python_2.12  (1 usage found)}}
{{            org.apache.flink.table.legacyutils  (1 usage found)}}
{{                TestCollectionTableFactory.scala  (1 usage found)}}
{{                    127 class CollectionTableSource(}}
{{        flink-sql-client_2.12  (1 usage found)}}
{{            org.apache.flink.table.client.gateway.utils  (1 usage found)}}
{{                SimpleCatalogFactory.java  (1 usage found)}}
{{                    95 new StreamTableSource() {}}
{{        flink-table-api-java  (1 usage found)}}
{{            org.apache.flink.table.utils  (1 usage found)}}
{{                TableSourceMock.java  (1 usage found)}}
{{                    27 public class TableSourceMock implements 
TableSource {}}
{{        flink-table-common  (1 usage found)}}
{{            org.apache.flink.table.utils  (1 usage found)}}
{{                TypeMappingUtilsTest.java  (1 usage found)}}
{{                    375 private static class TestTableSource}}
{{        flink-table-planner_2.12  (19 usages found)}}
{{            org.apache.flink.table.planner.factories.utils  (1 usage found)}}
{{                TestCollectionTableFactory.scala  (1 usage found)}}
{{                    115 class CollectionTableSource(}}
{{            org.apache.flink.table.planner.plan.stream.sql.join  (2 usages 
found)}}
{{                LookupJoinTest.scala  (2 usages found)}}
{{                

[jira] [Created] (FLINK-27197) Port ArrowTableSource and PythonInputFormatTableSource to DynamicTableSource

2022-04-12 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-27197:
---

 Summary: Port ArrowTableSource and PythonInputFormatTableSource to 
DynamicTableSource
 Key: FLINK-27197
 URL: https://issues.apache.org/jira/browse/FLINK-27197
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27196) Remove old format interfaces

2022-04-12 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-27196:
---

 Summary: Remove old format interfaces
 Key: FLINK-27196
 URL: https://issues.apache.org/jira/browse/FLINK-27196
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27185) Run the assertj conversion script to convert assertions in connectors and formats

2022-04-11 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-27185:
---

 Summary: Run the assertj conversion script to convert assertions 
in connectors and formats
 Key: FLINK-27185
 URL: https://issues.apache.org/jira/browse/FLINK-27185
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27043) Remove CSV connector test usages

2022-04-04 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-27043:
---

 Summary: Remove CSV connector test usages
 Key: FLINK-27043
 URL: https://issues.apache.org/jira/browse/FLINK-27043
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26988) Better error reporting when format fails

2022-04-01 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26988:
---

 Summary: Better error reporting when format fails
 Key: FLINK-26988
 URL: https://issues.apache.org/jira/browse/FLINK-26988
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Reporter: Francesco Guardiani


Today when a format fails, depending on the format implementation, there might 
be no information about the split (file name, offset, etc), making hard to 
debug format failures. For example, here is what I've seen with csv format:


{code:java}
Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
exception
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
 ~[flink-connector-files-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
 ~[flink-connector-files-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
 ~[flink-connector-files-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:403)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:531)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:227)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:841)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:767) 
~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 
~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at java.lang.Thread.run(Thread.java:829) ~[?:?]
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
 ~[flink-connector-files-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
 ~[flink-connector-files-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 
~[?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
~[?:?]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
~[?:?]
    at java.lang.Thread.run(Thread.java:829) ~[?:?]
Caused by: java.lang.RuntimeException: Invalid UTF-8 middle byte 0x54 (at char 
#3529, byte #3528): check content encoding, does not look like UTF-8
    at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator._handleIOException(MappingIterator.java:417)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator.hasNext(MappingIterator.java:190)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.formats.csv.CsvReaderFormat$Reader.read(CsvReaderFormat.java:194)
 ~[flink-csv-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.connector.file.src.impl.StreamFormatAdapter$Reader.readBatch(StreamFormatAdapter.java:214)
 ~[flink-connector-files-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
 ~[flink-connector-files-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
    

[jira] [Created] (FLINK-26952) Remove old CSV connector

2022-03-31 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26952:
---

 Summary: Remove old CSV connector
 Key: FLINK-26952
 URL: https://issues.apache.org/jira/browse/FLINK-26952
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


This involves removing all references in our tests to the old CSV connector 
(mostly TableEnvironmentITCase, TableEnvironmentTest and TableITCase) and 
removing the usage from a couple of e2e tests



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26950) Remove TableSink and TableSource interfaces

2022-03-31 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26950:
---

 Summary: Remove TableSink and TableSource interfaces
 Key: FLINK-26950
 URL: https://issues.apache.org/jira/browse/FLINK-26950
 Project: Flink
  Issue Type: Technical Debt
Reporter: Francesco Guardiani


This issue involves removing the old TableSink and TableSource stack, replaced 
by the new DynamicTableSink and DynamicTableSource.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26782) Remove PlannerExpression and related

2022-03-21 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26782:
---

 Summary: Remove PlannerExpression and related
 Key: FLINK-26782
 URL: https://issues.apache.org/jira/browse/FLINK-26782
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26704) Remove string expression DSL

2022-03-17 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26704:
---

 Summary: Remove string expression DSL
 Key: FLINK-26704
 URL: https://issues.apache.org/jira/browse/FLINK-26704
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26582) Run the assertj conversion script to convert assertions in flink-table

2022-03-10 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26582:
---

 Summary: Run the assertj conversion script to convert assertions 
in flink-table
 Key: FLINK-26582
 URL: https://issues.apache.org/jira/browse/FLINK-26582
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26553) Enable scalafmt for scala codebase

2022-03-09 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26553:
---

 Summary: Enable scalafmt for scala codebase
 Key: FLINK-26553
 URL: https://issues.apache.org/jira/browse/FLINK-26553
 Project: Flink
  Issue Type: Bug
  Components: Build System
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


As discussed in 
https://lists.apache.org/thread/97398pc9cb8y922xlb6mzlsbjtjf5jnv, we should 
enable scalafmt in our codebase



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26551) Make the legacy behavior disabled by default

2022-03-09 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26551:
---

 Summary: Make the legacy behavior disabled by default
 Key: FLINK-26551
 URL: https://issues.apache.org/jira/browse/FLINK-26551
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani


Followup of https://issues.apache.org/jira/browse/FLINK-25111

For the discussion, see 
https://lists.apache.org/thread/r13y3plwwyg3sngh8cz47flogq621txv



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26549) INSERT INTO with VALUES leads to wrong type inference with nested types

2022-03-09 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26549:
---

 Summary: INSERT INTO with VALUES leads to wrong type inference 
with nested types
 Key: FLINK-26549
 URL: https://issues.apache.org/jira/browse/FLINK-26549
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Francesco Guardiani


While working on casting, I've found out we have an interesting bug in the 
insert values type inference. This comes from the 
{{KafkaTableITCase#testKafkaSourceSinkWithMetadata}} (look at this version in 
particular 
https://github.com/apache/flink/blob/567440115bcacb5aceaf3304e486281c7da8c14f/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java).

The test scenario is an INSERT INTO VALUES query which is also pushing some 
metadata to a Kafka table, in particular is writing the headers metadata.

The table is declared like that:

{code:sql}
 CREATE TABLE kafka (
  `physical_1` STRING,
  `physical_2` INT,
  `timestamp-type` STRING METADATA VIRTUAL,
  `timestamp` TIMESTAMP(3) METADATA,
  `leader-epoch` INT METADATA VIRTUAL,
  `headers` MAP METADATA,
  `partition` INT METADATA VIRTUAL,
  `topic` STRING METADATA VIRTUAL,
  `physical_3` BOOLEAN
) WITH (
   'connector' = 'kafka',
   [...]
)
{code}

The insert into query looks like:

{code:sql}
INSERT INTO kafka VALUES
('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', x'C0FFEE', 'k2', 
x'BABE'], TRUE),
('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP), FALSE),
('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2', 
X'20'], TRUE)
{code}

Note that in the first row, the byte literal is of length 3, while in the last 
row the byte literal is of length 1.

The generated plan of this INSERT INTO is:

{code}
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], 
headers=[CAST($3):(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP], 
timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
   +- LogicalUnion(all=[true])
  :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], 
EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
  :  +- LogicalValues(tuples=[[{ 0 }]])
  :- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], 
EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], 
EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP], EXPR$4=[false])
  :  +- LogicalValues(tuples=[[{ 0 }]])
  +- LogicalProject(EXPR$0=[_UTF-16LE'data 3'], EXPR$1=[3], 
EXPR$2=[2020-03-10 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', 
X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1))], EXPR$4=[true])
 +- LogicalValues(tuples=[[{ 0 }]])

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
timestamp])
   :- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS 
physical_3, CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', 
X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT NULL, BINARY(1) 
NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
   :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
   :- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS 
physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
   :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
   +- Calc(select=[_UTF-16LE'data 3' AS physical_1, 3 AS physical_2, true AS 
physical_3, CAST(MAP(_UTF-16LE'k1', X'10':BINARY(1), _UTF-16LE'k2', 
X'20':BINARY(1)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-10 
12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, 
physical_2, physical_3, headers, timestamp])
+- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, 
timestamp])
   :- Calc(select=['data 1' AS physical_1, 1 AS physical_2, true AS physical_3, 

[jira] [Created] (FLINK-26520) Implement SEARCH operator

2022-03-07 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26520:
---

 Summary: Implement SEARCH operator
 Key: FLINK-26520
 URL: https://issues.apache.org/jira/browse/FLINK-26520
 Project: Flink
  Issue Type: Technical Debt
  Components: Table SQL / Planner
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


The codegen right now is not implementing the SEARCH operator, but it's using 
the rex builder to circumvent it. We should implement the SEARCH operator 
directly, to remove the usage of the flink type factory



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26519) Remove FlinkTypeFactory.INSTANCE singleton

2022-03-07 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26519:
---

 Summary: Remove FlinkTypeFactory.INSTANCE singleton
 Key: FLINK-26519
 URL: https://issues.apache.org/jira/browse/FLINK-26519
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


This singleton is not correct, as the user classloader needs to be passed for 
structured types creation



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26463) TableEnvironmentITCase should use MiniCluster

2022-03-03 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26463:
---

 Summary: TableEnvironmentITCase should use MiniCluster
 Key: FLINK-26463
 URL: https://issues.apache.org/jira/browse/FLINK-26463
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26422) Update Chinese documentation with the new TablePipeline docs

2022-03-01 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26422:
---

 Summary: Update Chinese documentation with the new TablePipeline 
docs
 Key: FLINK-26422
 URL: https://issues.apache.org/jira/browse/FLINK-26422
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: Francesco Guardiani


Chinese docs needs to be updated with the content of this commit: 
https://github.com/apache/flink/commit/4f65c7950f2c3ef849f2094deab0e199ffedf57b



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26280) Add a flag to disable uid generation

2022-02-21 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26280:
---

 Summary: Add a flag to disable uid generation
 Key: FLINK-26280
 URL: https://issues.apache.org/jira/browse/FLINK-26280
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Francesco Guardiani


We should add a flag to disable uid generation for back-compat. See the 
discussion here: https://issues.apache.org/jira/browse/FLINK-25932



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26252) Refactor MiniClusterExtension

2022-02-18 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26252:
---

 Summary: Refactor MiniClusterExtension
 Key: FLINK-26252
 URL: https://issues.apache.org/jira/browse/FLINK-26252
 Project: Flink
  Issue Type: Bug
  Components: Test Infrastructure
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26249) Run BuiltInFunctionITCase tests in parallel

2022-02-18 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26249:
---

 Summary: Run BuiltInFunctionITCase tests in parallel
 Key: FLINK-26249
 URL: https://issues.apache.org/jira/browse/FLINK-26249
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner, Test Infrastructure
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26131) CompiledPlan should implement Executable

2022-02-14 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26131:
---

 Summary: CompiledPlan should implement Executable
 Key: FLINK-26131
 URL: https://issues.apache.org/jira/browse/FLINK-26131
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Francesco Guardiani


In order to do that, we need to keep a reference of TableEnvironment within the 
implementation of CompiledPlan. We should also check that the TableEnvironment 
matches



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26128) Improve Table, Expressions and related classes Javadocs/Scaladocs

2022-02-14 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26128:
---

 Summary: Improve Table, Expressions and related classes 
Javadocs/Scaladocs
 Key: FLINK-26128
 URL: https://issues.apache.org/jira/browse/FLINK-26128
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Table SQL / API
Reporter: Francesco Guardiani


Right now we have inconsistent Javadocs and Scaladocs in Table and Expressions. 
Each of them has a different format, and some javadocs are even mentioning 
scala examples. We should properly reword them to be consistent, remove scala 
examples when not necessary, expand java examples. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26127) Cleanup usage of deprecated table methods in doc

2022-02-14 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26127:
---

 Summary: Cleanup usage of deprecated table methods in doc
 Key: FLINK-26127
 URL: https://issues.apache.org/jira/browse/FLINK-26127
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Table SQL / API
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


In our docs we have a lot of deprecated methods usage in various examples. We 
should clean them up, and replace with new methods usage



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26125) Doc overhaul for the CAST behaviour

2022-02-14 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26125:
---

 Summary: Doc overhaul for the CAST behaviour
 Key: FLINK-26125
 URL: https://issues.apache.org/jira/browse/FLINK-26125
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani


This includes:

* Proper documentation of the new TRY_CAST
* Add a CAST matrix to document which CAST tuples are supported



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26090) Remove pre FLIP-84 methods

2022-02-11 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26090:
---

 Summary: Remove pre FLIP-84 methods
 Key: FLINK-26090
 URL: https://issues.apache.org/jira/browse/FLINK-26090
 Project: Flink
  Issue Type: Bug
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


See https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878 
and https://issues.apache.org/jira/browse/FLINK-16364.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26089) Introduce TablePipeline

2022-02-11 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26089:
---

 Summary: Introduce TablePipeline
 Key: FLINK-26089
 URL: https://issues.apache.org/jira/browse/FLINK-26089
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


See 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489#FLIP190:SupportVersionUpgradesforTableAPI



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26071) TableEnvironment#compilePlan should fail

2022-02-10 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26071:
---

 Summary: TableEnvironment#compilePlan should fail
 Key: FLINK-26071
 URL: https://issues.apache.org/jira/browse/FLINK-26071
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


FLINK-25841 introduced {{compilePlan}} in {{TableEnvironment}}. This API should 
fail, rather than failing later in {{writeToFile}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26060) Make Python specific exec nodes unsupported

2022-02-09 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26060:
---

 Summary: Make Python specific exec nodes unsupported 
 Key: FLINK-26060
 URL: https://issues.apache.org/jira/browse/FLINK-26060
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


These nodes are using the old type system, which is going to be removed soon. 
We should avoid supporting them in the persisted plan, as we cannot commit to 
support them. Once migrated to the new type system, PyFlink won't need these 
nodes anymore and will just rely on Table new function stack. For more details, 
also check https://issues.apache.org/jira/browse/FLINK-25231



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26054) Enable maven-enforcer to disable table-planner and table-runtime as dependencies

2022-02-09 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26054:
---

 Summary: Enable maven-enforcer to disable table-planner and 
table-runtime as dependencies
 Key: FLINK-26054
 URL: https://issues.apache.org/jira/browse/FLINK-26054
 Project: Flink
  Issue Type: Bug
  Components: Build System, Table SQL / Planner
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


>From https://github.com/apache/flink/pull/18676#discussion_r802502438, we 
>should enable the enforcer for table-planner and table-runtime modules, as 
>described in the flink-table/README.md



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26053) Fix parser generator warnings

2022-02-09 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-26053:
---

 Summary: Fix parser generator warnings
 Key: FLINK-26053
 URL: https://issues.apache.org/jira/browse/FLINK-26053
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Francesco Guardiani


When building flink-sql-parser, the javacc logs a couple of warnings:


{code}
Warning: Choice conflict in [...] construct at line 2920, column 5.
 Expansion nested within construct and expansion following construct
 have common prefixes, one of which is: "PLAN"
 Consider using a lookahead of 2 or more for nested expansion.
Warning: Choice conflict involving two expansions at
 line 2930, column 9 and line 2932, column 9 respectively.
 A common prefix is: "STATEMENT"
 Consider using a lookahead of 2 for earlier expansion.
Warning: Choice conflict involving two expansions at
 line 2952, column 9 and line 2954, column 9 respectively.
 A common prefix is: "STATEMENT"
 Consider using a lookahead of 2 for earlier expansion.
{code}

Took from 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30871=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=3911

We should investigate them, as they're often symptom of some bug in our parser 
template, which might result in unexpected parsing errors.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25986) Add FLIP-190 new API methods to python

2022-02-07 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25986:
---

 Summary: Add FLIP-190 new API methods to python
 Key: FLINK-25986
 URL: https://issues.apache.org/jira/browse/FLINK-25986
 Project: Flink
  Issue Type: Bug
  Components: API / Python, Table SQL / API
Reporter: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25942) Use jackson jdk8/time modules for Duration ser/de

2022-02-03 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25942:
---

 Summary: Use jackson jdk8/time modules for Duration ser/de
 Key: FLINK-25942
 URL: https://issues.apache.org/jira/browse/FLINK-25942
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


https://issues.apache.org/jira/browse/FLINK-25588 introduced jackson jdk8 and 
datetime modules to flink-shaded jackson, so now we don't need our Duration 
ser/de anymore (which was introduced because we lacked this jackson module).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25918) Use FileEnumerator to implement filter pushdown of filepath metadata

2022-02-02 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25918:
---

 Summary: Use FileEnumerator to implement filter pushdown of 
filepath metadata 
 Key: FLINK-25918
 URL: https://issues.apache.org/jira/browse/FLINK-25918
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Reporter: Francesco Guardiani


Right now, unless you configure partition keys, the table file source will 
ingest all the files in the provided {{path}}.

Which means that a query like:

{code:sql}
SELECT * FROM MyFileTable WHERE filepath LIKE "%.csv"
{code}

Will ingest all the files and then, after the records are loaded in flink, the 
filtering happens and discards all the records not coming from a file with 
pattern "%.csv".

Using the filter push down feature provided by the DynamicTableSource stack, we 
could instead provide the {{FileSourceBuilder}} directly a {{FileEnumerator}} 
that does the filtering of input files, so we can effectively skip reading them.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25897) Update project configuration gradle doc to 7.x version

2022-01-31 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25897:
---

 Summary: Update project configuration gradle doc to 7.x version
 Key: FLINK-25897
 URL: https://issues.apache.org/jira/browse/FLINK-25897
 Project: Flink
  Issue Type: Bug
Reporter: Francesco Guardiani
Assignee: Mario Rincon-Nigro


Update the gradle build script and its doc page to 7.x



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25895) Add ExecNodeGraph ser/de

2022-01-31 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25895:
---

 Summary: Add ExecNodeGraph ser/de
 Key: FLINK-25895
 URL: https://issues.apache.org/jira/browse/FLINK-25895
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


Right now we have an intermediate model called {{JsonPlanGraph}} to ser/de the 
ExecNodeGraph. This model is fine, but we should do the conversion through 
jackson rather than manually ser/de it with {{ExecNodeGraphJsonPlanGenerator}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25809) Introduce test infra for building FLIP-190 tests

2022-01-25 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25809:
---

 Summary: Introduce test infra for building FLIP-190 tests 
 Key: FLINK-25809
 URL: https://issues.apache.org/jira/browse/FLINK-25809
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


The FLIP-190 requires to build a new test infra. For this test infra, we want 
to define test cases and data once, and then for each case we want to execute 
the following:

* Integration test that roughly does {{create plan -> execute job -> trigger 
savepoint -> stop job -> restore plan -> restore savepoint -> execute job -> 
stop and assert}}. Plan and savepoint should be commited to git, so running 
this tests when a plan and savepoint is available will not regenerate plan and 
savepoint.
* Change detection test to check that for the defined test cases, the plan 
hasn't been changed. Similar to the existing {{JsonPlanITCase}} tests.
* Completeness of tests/Coverage, that is count how many times ExecNodes 
(including versions) are used in the test cases. Fail if an ExecNode version is 
never covered.

Other requirements includes to "version" the test cases, that is for each test 
case we can retain different versions of the plan and savepoint, in order to 
make sure that after we introduce a new plan change, the old plan still 
continues to run



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25791) Make ObjectIdentifier json representation simpler

2022-01-24 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25791:
---

 Summary: Make ObjectIdentifier json representation simpler
 Key: FLINK-25791
 URL: https://issues.apache.org/jira/browse/FLINK-25791
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


Use {{ObjectIdentifier#asSerializableString}} to serialize the object 
identifier, rather than serializing it as object.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25779) Define ConfigOption for properties map in Kinesis

2022-01-24 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25779:
---

 Summary: Define ConfigOption for properties map in Kinesis
 Key: FLINK-25779
 URL: https://issues.apache.org/jira/browse/FLINK-25779
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Reporter: Francesco Guardiani


Now there is no {{ConfigOption}} definition of {{properties.*}} in Kafka 
connector. This breaks option forwarding when restoring the table options from 
the persisted plan/catalog. We can define it using 
{{ConfigOptionBuilder#mapType}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25778) Define ConfigOption for properties map in Kafka

2022-01-24 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25778:
---

 Summary: Define ConfigOption for properties map in Kafka
 Key: FLINK-25778
 URL: https://issues.apache.org/jira/browse/FLINK-25778
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Reporter: Francesco Guardiani


Now there is no {{ConfigOption}} definition of {{properties.*}} in Kafka 
connector. This breaks option forwarding when restoring the table options from 
the persisted plan/catalog. We can define it using 
{{ConfigOptionBuilder#mapType}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25777) Generate documentation for Table factories (formats and connectors)

2022-01-24 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25777:
---

 Summary: Generate documentation for Table factories (formats and 
connectors)
 Key: FLINK-25777
 URL: https://issues.apache.org/jira/browse/FLINK-25777
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / Common, Documentation, Formats (JSON, Avro, 
Parquet, ORC, SequenceFile)
Reporter: Francesco Guardiani


The goal of this issue is to generate automatically from code the documentation 
of configuration options for table connectors and formats.

This issue includes:

* Tweak {{ConfigOptionsDocGenerator}} to work with {{Factory#requiredOptions}}, 
{{Factory#requiredOptions}} and newly introduced 
{{DynamicTableFactory#forwardOptions}} and {{FormatFactory#forwardOptions}}. 
Also see this https://github.com/apache/flink/pull/18290 as reference. From 
these methods we should extract if an option is required or not, and if it's 
forwardable or not.
* Decide whether the generator output should be, and how to link/include it in 
the connector/format docs pages.
* Enable the code generator in CI.
* Regenerate all the existing docs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25588) Add jdk8 and datetime module to jackson shaded

2022-01-10 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25588:
---

 Summary: Add jdk8 and datetime module to jackson shaded
 Key: FLINK-25588
 URL: https://issues.apache.org/jira/browse/FLINK-25588
 Project: Flink
  Issue Type: Improvement
  Components: BuildSystem / Shaded
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


Add modules jdk8 and jsr310 to jackson shaded. 
https://github.com/FasterXML/jackson-modules-java8



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25526) Deprecate TableSinkFactory, TableSourceFactory and TableFormatFactory

2022-01-05 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25526:
---

 Summary: Deprecate TableSinkFactory, TableSourceFactory and 
TableFormatFactory
 Key: FLINK-25526
 URL: https://issues.apache.org/jira/browse/FLINK-25526
 Project: Flink
  Issue Type: Technical Debt
  Components: Table SQL / API
Reporter: Francesco Guardiani
 Fix For: 1.15.0


These factories are part of the old type system/sink & source stack and should 
not be used anymore, as users should work with 
DynamicTableSink/DynamicTableSource factories instead.

This task should deprecate {{TableSinkFactory}}, {{TableSourceFactory}} and 
{{TableFormatFactory}} and all the related types.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25518) Harden JSON Serialization utilities

2022-01-04 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25518:
---

 Summary: Harden JSON Serialization utilities
 Key: FLINK-25518
 URL: https://issues.apache.org/jira/browse/FLINK-25518
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25428) Expose complex types CAST to String

2021-12-23 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25428:
---

 Summary: Expose complex types CAST to String
 Key: FLINK-25428
 URL: https://issues.apache.org/jira/browse/FLINK-25428
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Francesco Guardiani
 Attachments: cast_function_it_case.patch, logical_type_casts.patch

Right now we have all the casting rules for collection, structured and raw 
types to string, that is we have logic to stringify the following types:

* ARRAY
* MAP
* MULTISET
* ROW
* STRUCTURED
* RAW

Unfortunately these don't work, for different reasons, notably:

* We need to support these combinations in {{LogicalTypeCasts}} (check the 
attached patch)
* For some of them Calcite applies its casting validation logic and marks them 
as invalid
* For MULTISET and STRUCTURED, there are issues specific to Table API and its 
expression stack, which cannot correctly convert the values to literal

You can check all these errors by applying the attached patch to the cast 
function it cases.

We need to fix these issues, so users can use SQL and Table API to cast these 
values to string.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25300) Remove CEIL and FLOOR call with one argument for integral values

2021-12-14 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25300:
---

 Summary: Remove CEIL and FLOOR call with one argument for integral 
values
 Key: FLINK-25300
 URL: https://issues.apache.org/jira/browse/FLINK-25300
 Project: Flink
  Issue Type: Technical Debt
  Components: Table SQL / Runtime
Reporter: Francesco Guardiani


Right now when the user tries to invoke {{FLOOR}} and {{CEIL}} on any integral 
number, a call to {{Math.floor}}/{{Math.ceil}} is generated, which is noop and 
possibly might end up in incorrect results.

We should rather implement a rule in the planner (or reuse the expression 
reducer rule) that removes {{FLOOR}} and {{CEIL}} in case its arguments are 
integral types, that is TINYINT, SMALLINT, INT and BIGINT



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25299) Improve LIKE operator efficiency

2021-12-14 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25299:
---

 Summary: Improve LIKE operator efficiency
 Key: FLINK-25299
 URL: https://issues.apache.org/jira/browse/FLINK-25299
 Project: Flink
  Issue Type: Technical Debt
  Components: Table SQL / Runtime
Reporter: Francesco Guardiani


Right now LIKE is implemented using regexes (check {{SqlLikeUtils}} provided by 
https://issues.apache.org/jira/browse/FLINK-25282).

We can improve it either removing the regex based implementation and manually 
implementing the matching code, or we can at least improve it caching the regex 
in the codegen assigning the pattern to a field of the generated function, as 
parsing the pattern is usually an expensive operation.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25282) Move runtime dependencies from table-planner to table-runtime

2021-12-13 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25282:
---

 Summary: Move runtime dependencies from table-planner to 
table-runtime
 Key: FLINK-25282
 URL: https://issues.apache.org/jira/browse/FLINK-25282
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


There are several runtime dependencies (e.g. functions used in codegen) that 
are shipped by table-planner and calcite-core. We should move these 
dependencies to runtime



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25229) Introduce flink-table-api-bridge-common

2021-12-09 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25229:
---

 Summary: Introduce flink-table-api-bridge-common
 Key: FLINK-25229
 URL: https://issues.apache.org/jira/browse/FLINK-25229
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API, Table SQL / Planner
Reporter: Francesco Guardiani


This package should deduplicate code from api-java-bridge and api-scala-bridge, 
notably:

* the various operations provided by both {{ScalaDataStreamQueryOperation}} and 
{{JavaDataStreamQueryOperation}} (which are essentially the same code)
* some code in {{StreamTableEnvironmentImpl}} and {{StreamStatementSetImpl}}

The end goal is that planner should remove the runtime (not test) dependency on 
flink-scala-api and flink-scala-api-bridge



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25228) Introduce flink-table-test-utils

2021-12-09 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25228:
---

 Summary: Introduce flink-table-test-utils
 Key: FLINK-25228
 URL: https://issues.apache.org/jira/browse/FLINK-25228
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API, Table SQL / Ecosystem, Table SQL / 
Planner
Reporter: Francesco Guardiani


Introduce a package to ship test utilities for formats, connectors and end 
users.

This package should provide:

* Assertions for data types, logical types and internal data structures.
* Test cases for formats and connnectors

The end goal is to remove the test-jar planner dependency in formats and 
connectors and replace it with this package, so formats and connectors can then 
just depend on table-planner-loader.   



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25158) Fix formatting for true, false and null to uppercase

2021-12-03 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25158:
---

 Summary: Fix formatting for true, false and null to uppercase
 Key: FLINK-25158
 URL: https://issues.apache.org/jira/browse/FLINK-25158
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Francesco Guardiani


All the cast rules using the constant strings {{true}}, {{false}} and {{null}} 
should use {{TRUE}}, {{FALSE}} and {{NULL}} instead.

This behavior should be enabled only if legacy behavior is not enabled



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25156) DISTINCT is not handled correctly by CastRules

2021-12-03 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25156:
---

 Summary: DISTINCT is not handled correctly by CastRules
 Key: FLINK-25156
 URL: https://issues.apache.org/jira/browse/FLINK-25156
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani


We need to support DISTINCT types in CastRuleProvider, or with some sort of 
meta rule, so every rule can support it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25157) Introduce NULL type to string cast rule

2021-12-03 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25157:
---

 Summary: Introduce NULL type to string cast rule
 Key: FLINK-25157
 URL: https://issues.apache.org/jira/browse/FLINK-25157
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25131) Update sql client to ship flink-table-planner-loader instead of flink-table-planner

2021-12-01 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25131:
---

 Summary: Update sql client to ship flink-table-planner-loader 
instead of flink-table-planner
 Key: FLINK-25131
 URL: https://issues.apache.org/jira/browse/FLINK-25131
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Client
Reporter: Francesco Guardiani


See the parent task for more details.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25130) Update flink-table-uber to ship flink-table-planner-loader instead of flink-table-planner

2021-12-01 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25130:
---

 Summary: Update flink-table-uber to ship 
flink-table-planner-loader instead of flink-table-planner
 Key: FLINK-25130
 URL: https://issues.apache.org/jira/browse/FLINK-25130
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Francesco Guardiani


This should also be tested by the sql client.

This change should be tested then by our e2e tests, specifically 
flink-end-to-end-tests/flink-batch-sql-test.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25129) Update docs and examples to use flink-table-planner-loader instead of flink-table-planner

2021-12-01 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25129:
---

 Summary: Update docs and examples to use 
flink-table-planner-loader instead of flink-table-planner
 Key: FLINK-25129
 URL: https://issues.apache.org/jira/browse/FLINK-25129
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation, Examples, Table SQL / API
Reporter: Francesco Guardiani


For more details 
https://docs.google.com/document/d/12yDUCnvcwU2mODBKTHQ1xhfOq1ujYUrXltiN_rbhT34/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25128) Introduce flink-table-planner-loader

2021-12-01 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25128:
---

 Summary: Introduce flink-table-planner-loader
 Key: FLINK-25128
 URL: https://issues.apache.org/jira/browse/FLINK-25128
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani


For more details, see 
https://docs.google.com/document/d/12yDUCnvcwU2mODBKTHQ1xhfOq1ujYUrXltiN_rbhT34/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25114) Remove flink-scala dependency from flink-table-runtime

2021-11-30 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25114:
---

 Summary: Remove flink-scala dependency from flink-table-runtime
 Key: FLINK-25114
 URL: https://issues.apache.org/jira/browse/FLINK-25114
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


flink-scala should not be necessary anymore to flink-table-runtime. We should 
try to remove it in order to help with the parent task 
[FLINK-24427|https://issues.apache.org/jira/browse/FLINK-24427]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25113) Cleanup from Parquet and Orc the partition key handling logic

2021-11-30 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25113:
---

 Summary: Cleanup from Parquet and Orc the partition key handling 
logic
 Key: FLINK-25113
 URL: https://issues.apache.org/jira/browse/FLINK-25113
 Project: Flink
  Issue Type: Technical Debt
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Francesco Guardiani


After https://issues.apache.org/jira/browse/FLINK-24617 the partition key 
handling logic is encapsuled within {{FileInfoExtractorBulkFormat}}. We should 
cleanup this logic from orc and parquet formats, in order to simplify it. Note: 
Hive still depends on this logic, but it should rather use 
{{FileInfoExtractorBulkFormat}} or similar.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25090) Run the assertj conversion script to convert assertions

2021-11-29 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25090:
---

 Summary: Run the assertj conversion script to convert assertions
 Key: FLINK-25090
 URL: https://issues.apache.org/jira/browse/FLINK-25090
 Project: Flink
  Issue Type: Bug
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


See https://assertj.github.io/doc/#assertj-migration-using-regexes for more 
details



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25079) Add assertj assertions for table types

2021-11-26 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25079:
---

 Summary: Add assertj assertions for table types
 Key: FLINK-25079
 URL: https://issues.apache.org/jira/browse/FLINK-25079
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25075) Remove reflection to instantiate PlannerExpressionParser

2021-11-26 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25075:
---

 Summary: Remove reflection to instantiate PlannerExpressionParser
 Key: FLINK-25075
 URL: https://issues.apache.org/jira/browse/FLINK-25075
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API, Table SQL / Planner
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


This reflection leaks the planner module classpath and can cause issues when 
isolating the classpath



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25061) Add assertj as dependency in flink-parent

2021-11-25 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25061:
---

 Summary: Add assertj as dependency in flink-parent
 Key: FLINK-25061
 URL: https://issues.apache.org/jira/browse/FLINK-25061
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


We recently discussed test assertion on the ML 
(https://lists.apache.org/thread/33t7hz8w873p1bc5msppk65792z08rgt), and came to 
the conclusion to that we want to encourage contributions to use assertj as 
much as possible. In order to do that, we should add assertj in flink-parent 
pom as test dependency



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25060) Replace DataType.projectFields with Projection type

2021-11-25 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25060:
---

 Summary: Replace DataType.projectFields with Projection type
 Key: FLINK-25060
 URL: https://issues.apache.org/jira/browse/FLINK-25060
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


FLINK-24399 introduced new methods to perform data types projections in 
DataType. Note: no release included such changes.

FLINK-24776 introduced a new, more powerful, type to perform operations on 
projections, that is project types, but also difference and complement.

In spite of avoiding to provide different entrypoints for the same 
functionality, we should cleanup the new methods introduced by FLINK-24399 and 
replace them with the new Projection type. We should also deprecate the 
functions in DataTypeUtils.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25052) Port row to row cast logic to CastRule

2021-11-25 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25052:
---

 Summary: Port row to row cast logic to CastRule
 Key: FLINK-25052
 URL: https://issues.apache.org/jira/browse/FLINK-25052
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25051) Port raw <-> binary logic to CastRule

2021-11-25 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25051:
---

 Summary: Port raw <-> binary logic to CastRule
 Key: FLINK-25051
 URL: https://issues.apache.org/jira/browse/FLINK-25051
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


More details on the parent task



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24924) TO_TIMESTAMP and TO_DATE should fail

2021-11-16 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24924:
---

 Summary: TO_TIMESTAMP and TO_DATE should fail
 Key: FLINK-24924
 URL: https://issues.apache.org/jira/browse/FLINK-24924
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani


In a similar fashion to what described 
https://issues.apache.org/jira/browse/FLINK-24385, TO_TIMESTAMP and TO_DATE 
should fail instead of returning an error.

In particular for these two functions, a failure in parsing could lead to very 
unexpected behavior, for example it could lead to records with null rowtime.

We should change these functions to fail by default when parsing generates an 
error. We can let users handle errors by letting them use TRY_CAST for the same 
functionality:

{code:sql}
-- This fails when input is invalid
TO_TIMESTAMP(input)

-- Behaves the same as above
CAST(input AS TIMESTAMP)

-- This returns null when input is invalid
TRY_CAST(input AS TIMESTAMP)
{code}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24902) Port boolean <-> numeric casting logic to CastRule

2021-11-15 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24902:
---

 Summary: Port boolean <-> numeric casting logic to CastRule
 Key: FLINK-24902
 URL: https://issues.apache.org/jira/browse/FLINK-24902
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Francesco Guardiani


More details on the parent task



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24847) Decide the overflows behaviour

2021-11-09 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24847:
---

 Summary: Decide the overflows behaviour
 Key: FLINK-24847
 URL: https://issues.apache.org/jira/browse/FLINK-24847
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API, Table SQL / Planner, Table SQL / Runtime
Affects Versions: 1.14.0
Reporter: Francesco Guardiani


Right now we have inconsistent behavior when it comes down to overflows, 
depending on whether the value comes from a literal or from a value generated 
by the runtime (eg after a sum).

In particular, I tracked down an issue when trying to execute 
{{CAST(9.2345682E9):INTEGER}} which returns {{644633299}} instead of 
{{2147483647}} (the result of {{(int)9234567891.12f}}, because Calcite changes 
the type of the literal to INTEGER, skipping completely our casting logic in 
codegen and just forcing us to generate a literal using 
{{literal.getValue().intValue()}} (Note that Calcite uses {{BigDecimal}} for 
every numeric, no matter the type).

Relevant code related to the issue:

* {{RexBuilder#makeCast}}
* {{GenerateUtils#generateLiteral}}

This issue brings me to the following questions:

* Should we throw an error on overflows?
** If yes, should this be the default behavior or just something we configure 
behind a flag? 
** If no, should we have consistent and useful results when overflows (e.g. max 
value)?
*** If yes, what should be those overflow values? 
*** If no, do we keep everything as it is and document that the user needs to 
be careful about overflows? 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24843) DynamicTableFactory.Context.getCatalogTable().getPartitionKeys() should return indexes

2021-11-09 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24843:
---

 Summary: 
DynamicTableFactory.Context.getCatalogTable().getPartitionKeys() should return 
indexes
 Key: FLINK-24843
 URL: https://issues.apache.org/jira/browse/FLINK-24843
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Francesco Guardiani


Right now invoking {{context.getCatalogTable().getPartitionKeys()}} returns 
field names. We should encourage users to use indexes,  by having a new method 
in context or by adding a new method to CatalogTable that returns {{int[]}} and 
can be used in conjunction with {{DataType.excludeFields}}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24781) Port from string casting logic to CastRule

2021-11-04 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24781:
---

 Summary: Port from string casting logic to CastRule
 Key: FLINK-24781
 URL: https://issues.apache.org/jira/browse/FLINK-24781
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24780) Port time to time casting logic to CastRule

2021-11-04 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24780:
---

 Summary: Port time to time casting logic to CastRule
 Key: FLINK-24780
 URL: https://issues.apache.org/jira/browse/FLINK-24780
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Francesco Guardiani


Port various timestamp related cast logic (timestamp to timestamp_ltz, 
timestamp_ltz to timestamp, timestamp to timestamp and timestamp_ltz to 
timestamp_ltz) to the new CastRule stack



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24779) Port Numeric casting logic to CastRule

2021-11-04 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24779:
---

 Summary: Port Numeric casting logic to CastRule
 Key: FLINK-24779
 URL: https://issues.apache.org/jira/browse/FLINK-24779
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


For more details, check the parent issue



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24778) Add DataTypes#ROW(List)

2021-11-04 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24778:
---

 Summary: Add DataTypes#ROW(List)
 Key: FLINK-24778
 URL: https://issues.apache.org/jira/browse/FLINK-24778
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API, Table SQL / Ecosystem
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


This method helps to reduce friction when using Stream of fields, because it 
can be used for {{Collectors.collectingAndThen(Collectors.toList, 
DataTypes::ROW)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24776) Clarify DecodingFormat

2021-11-04 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24776:
---

 Summary: Clarify DecodingFormat
 Key: FLINK-24776
 URL: https://issues.apache.org/jira/browse/FLINK-24776
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Francesco Guardiani


Today the {{org.apache.flink.table.connector.format.DecodingFormat}} interface 
has not clear requirements and it's confusing for implementers. In particular, 
it's unclear whether the format need to support projection push down or not, 
and whether the {{DataType}} provided to {{createRuntimeDecoder}} is projected 
and includes partition keys or not. An example of such misunderstanding is 
shown here: 
https://github.com/apache/flink/blob/991dd0466ff28995a22ded0727ef2a1706d9bddc/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java#L107

The PR https://github.com/apache/flink/pull/17544 partially addresses the 
issue, because it removes the need from BulkFormat implementations to take care 
of partition keys handling. Neverthless, it's still unclear whether formats 
support projections or not and if they support nested projections.

We should refactor {{DecodingFormat}} as follows:

* We document that every {{DecodingFormat}} *MUST* support projections. This is 
already the case for every format we have (see 
https://github.com/apache/flink/pull/17544#issuecomment-953184692). A 
{{DecodingFormat}} *MAY* also support nested projections, and this is signaled 
by a new method {{DecodingFormat#supportsNestedProjection()}}
* Add a new method {{createRuntimeDecoder(DynamicTableSource.Context context, 
DataType physicalDataType, int[][] projections)}} that users should now 
implement. The {{physicalDataType}} in this signature is the physical data type 
from the table schema stripped of metadata columns and partition keys, with 
fields in the order defined by the table schema. The user can compute the final 
type with {{DataType.projectFields(physicalDataType, projections)}}
* Deprecate the old {{createRuntimeDecoder}}
* Default implement the new and old {{createRuntimeDecoder}} to ensure backward 
compatibility.

As alternative, we ([~twalthr] and I) explored the idea that formats might not 
support projection push down, but this is very unlikely and such change 
requires several planner changes, including breaking the interface 
{{SupportsProjectionPushDown}}.

We should also provide a {{RowData}} implementation that takes care of 
projection internally, so the {{DecodingFormat}} implementer that doesn't want 
to support projections can just use this {{RowData}} wrapper like: {{new 
ProjectedRowData(rowDataProducedByFormat, projections)}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24752) Cleanup ScalarOperatorGens#generateCast

2021-11-03 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24752:
---

 Summary: Cleanup ScalarOperatorGens#generateCast
 Key: FLINK-24752
 URL: https://issues.apache.org/jira/browse/FLINK-24752
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Francesco Guardiani


https://issues.apache.org/jira/browse/FLINK-24462 introduced a new stack to 
implement cast rules. We need to port all our casting logic to the new stack.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24747) Add producedDataType to SupportsProjectionPushDown.applyProjection

2021-11-03 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24747:
---

 Summary: Add producedDataType to 
SupportsProjectionPushDown.applyProjection
 Key: FLINK-24747
 URL: https://issues.apache.org/jira/browse/FLINK-24747
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Francesco Guardiani


{{SupportsProjectionPushDown.applyProjection}} should provide the 
{{producedDataType}} for consistency with 
{{SupportsReadingMetadata.applyReadableMetadata}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24685) Use the new casting rules in TableResult#print

2021-10-28 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24685:
---

 Summary: Use the new casting rules in TableResult#print
 Key: FLINK-24685
 URL: https://issues.apache.org/jira/browse/FLINK-24685
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24684) Port to string casting rules to the new CastRule interface

2021-10-28 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24684:
---

 Summary: Port to string casting rules to the new CastRule interface
 Key: FLINK-24684
 URL: https://issues.apache.org/jira/browse/FLINK-24684
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24673) Deprecate old Row SerializationSchema/DeserializationSchema

2021-10-27 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24673:
---

 Summary: Deprecate old Row 
SerializationSchema/DeserializationSchema
 Key: FLINK-24673
 URL: https://issues.apache.org/jira/browse/FLINK-24673
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Ecosystem
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


As discussed in the [mailing 
list|https://lists.apache.org/thread.html/red2f04ac782f2cd156a639a44c9962b7b92659b76e5ff683de664534%40%3Cdev.flink.apache.org%3E],
 we should deprecate in the next version:

   - org.apache.flink.formats.json.JsonRowSerializationSchema
   - org.apache.flink.formats.json.JsonRowDeserializationSchema
   - org.apache.flink.formats.avro.AvroRowSerializationSchema
   - org.apache.flink.formats.avro.AvroRowDeserializationSchema
   - org.apache.flink.formats.csv.CsvRowDeserializationSchema
   - org.apache.flink.formats.csv.CsvRowSerializationSchema



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24617) Support partition keys through metadata

2021-10-22 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24617:
---

 Summary: Support partition keys through metadata
 Key: FLINK-24617
 URL: https://issues.apache.org/jira/browse/FLINK-24617
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


Right now we have a lot of code around the various formats to support partition 
keys. From the FileSystemTableSource point of view, these can be handled 
reusing the metadata support provided by 
https://issues.apache.org/jira/browse/FLINK-24165. We should cleanup all that 
partition keys support code and just reuse what we have for metadata.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24616) Expose all metadata in FileStatus

2021-10-22 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24616:
---

 Summary: Expose all metadata in FileStatus
 Key: FLINK-24616
 URL: https://issues.apache.org/jira/browse/FLINK-24616
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani


FileStatus provides several useful metadata, including access time, creation 
time, etc. We should expose them through the support for metadata in filesystem 
connector added by https://issues.apache.org/jira/browse/FLINK-24165



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24615) Add infrastructure to support metadata in filesystem connector

2021-10-22 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24615:
---

 Summary: Add infrastructure to support metadata in filesystem 
connector
 Key: FLINK-24615
 URL: https://issues.apache.org/jira/browse/FLINK-24615
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani


{{FileSystemTableSource}} should implement {{SupportReadingMetadata}}, and we 
need a way to propagate file system metadata without adding the code to every 
format



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24575) Port TestCsvFileSystemFormatFactory to DeserializationSchema

2021-10-18 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24575:
---

 Summary: Port TestCsvFileSystemFormatFactory to 
DeserializationSchema
 Key: FLINK-24575
 URL: https://issues.apache.org/jira/browse/FLINK-24575
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24566) Remove CsvFileSystemFormatFactory

2021-10-15 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24566:
---

 Summary: Remove CsvFileSystemFormatFactory
 Key: FLINK-24566
 URL: https://issues.apache.org/jira/browse/FLINK-24566
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani


The DeserializationSchema implementation we have should be enough



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24565) Port Avro FileSystemFormatFactory to StreamFormat

2021-10-15 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24565:
---

 Summary: Port Avro FileSystemFormatFactory to StreamFormat
 Key: FLINK-24565
 URL: https://issues.apache.org/jira/browse/FLINK-24565
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Caizhi Weng






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24544) Failure when using Kafka connector in Table API with Avro and Confluent schema registry

2021-10-14 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24544:
---

 Summary: Failure when using Kafka connector in Table API with Avro 
and Confluent schema registry 
 Key: FLINK-24544
 URL: https://issues.apache.org/jira/browse/FLINK-24544
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka, Formats (JSON, Avro, Parquet, ORC, 
SequenceFile), Table SQL / Ecosystem
Affects Versions: 1.13.1
Reporter: Francesco Guardiani
 Attachments: flink-deser-avro-enum.zip

A user reported in the [mailing 
list|https://lists.apache.org/thread.html/re38a07f6121cc580737a20c11574719cfe554e58d99817f79db9bb4a%40%3Cuser.flink.apache.org%3E]
 that Avro deserialization fails when using Kafka, Avro and Confluent Schema 
Registry:  

{code:java}
Caused by: java.io.IOException: Failed to deserialize Avro record.
  at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
  at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
  
  at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
  at 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
  at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
 
  at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
  at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
  at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
  at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
  at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: org.apache.avro.AvroTypeException: Found my.type.avro.MyEnumType, 
expecting union
  at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
  at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
  at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
  at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
  at 
org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
  at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
  at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
  at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
  at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
  at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
  at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
  at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
  at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
  at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
  at 
org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
  at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
  ... 9 more
{code}

Look in the attached code for a reproducer




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24507) Cleanup all the various date/time/timestamp utils

2021-10-11 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24507:
---

 Summary: Cleanup all the various date/time/timestamp utils
 Key: FLINK-24507
 URL: https://issues.apache.org/jira/browse/FLINK-24507
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani


There are different timestamp utils that won't be required anymore once 
FLINK-21456 is solved. For example:

* {org.apache.flink.table.util.TimestampStringUtils}
* {org.apache.flink.table.utils.TimestampStringUtils}

Double check all the usages of these classes, and if there are any eventual 
classes like that, and replace them with the new {DateTimeUtils} in common 
whenever is possible



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24500) Move SqlDateTimeUtils to table-common

2021-10-11 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24500:
---

 Summary: Move SqlDateTimeUtils to table-common
 Key: FLINK-24500
 URL: https://issues.apache.org/jira/browse/FLINK-24500
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24499) OVER IGNORE NULLS support

2021-10-11 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24499:
---

 Summary: OVER IGNORE NULLS support
 Key: FLINK-24499
 URL: https://issues.apache.org/jira/browse/FLINK-24499
 Project: Flink
  Issue Type: New Feature
Reporter: Francesco Guardiani


OVER window specification has 2 keywords to specify how to deal with null 
values: IGNORE NULLS and RETAIN NULLS. They are specified in SQL 2008 paragraph 
6.10



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24466) Interval Join late events handling behaviour is not consistent

2021-10-07 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24466:
---

 Summary: Interval Join late events handling behaviour is not 
consistent
 Key: FLINK-24466
 URL: https://issues.apache.org/jira/browse/FLINK-24466
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Reporter: Francesco Guardiani
 Attachments: Fix_late_events_filtering_for_interval_join.patch

Interval Join handles late events emitting them in the output, as a padded row. 
This behavior is also tested extensively in {{RowTimeIntervalJoinTest}}.

The problem with this behavior is the way an event is considered "late" or not: 
in order to distinguish between the two, {{RowTimeIntervalJoin}} uses the 
{{ctx.timerService().currentWatermark()}} to find out if an event is later than 
the last received watermark or not. But that method returns the "combined" 
watermark across all the keys, partitions and *input streams*, that is if one 
of the two streams goes "slower" than the other one, the returned watermark is 
going to be the minimum among the two.

This means that our late events handling effectively works only if the two 
streams run "at the same pace", otherwise we'll just see what we consider _late 
events_ for one of the two streams as joined.

To observe this behavior, just run the test 
{{IntervalJoinITCase#testRowTimeInnerJoinWithEquiTimeAttrs}} in this revision 
https://github.com/apache/flink/commit/7033cbfe404bea1519d3342a611e2f92768d70f9 
several times and you'll see that after a couple of runs it fails, joining one 
of the {{"should-be-discarded"}} records. Those records are way behind the 
watermark - 1 second, as defined.

You'll find attached in the issue a small patch to show how this could be fixed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24462) Refactor casting rules in a similar fashion to DataStructureConverter

2021-10-06 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24462:
---

 Summary: Refactor casting rules in a similar fashion to 
DataStructureConverter
 Key: FLINK-24462
 URL: https://issues.apache.org/jira/browse/FLINK-24462
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24461) TableResult#print() should use internal data types

2021-10-06 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24461:
---

 Summary: TableResult#print() should use internal data types
 Key: FLINK-24461
 URL: https://issues.apache.org/jira/browse/FLINK-24461
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani


The collector used by {{TableResult#print()}} should use internal data types



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24363) java.io.FileNotFoundException crashes the standalone session manager

2021-09-23 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24363:
---

 Summary: java.io.FileNotFoundException crashes the standalone 
session manager
 Key: FLINK-24363
 URL: https://issues.apache.org/jira/browse/FLINK-24363
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem, Table SQL / Runtime
Affects Versions: shaded-14.0
 Environment: Flink 1.14-SNAPSHOT
Reporter: Francesco Guardiani


When trying to execute a query reading from a table configure with the 
filesystem connector and a bad path, the session manager crashes and sql client 
query execution times out:


{code:java}
org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Could not start 
RpcEndpoint jobmanager_2.
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:629)
 ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0]
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185)
 ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0]
   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0]
   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0]
   at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0]
   at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0]
   at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0]
   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0]
   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0]
   at akka.actor.Actor.aroundReceive(Actor.scala:537) 
~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0]
   at akka.actor.Actor.aroundReceive$(Actor.scala:535) 
~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0]
   at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 
~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0]
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) 
~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0]
   at akka.actor.ActorCell.invoke(ActorCell.scala:548) 
~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0]
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) 
[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0]
   at akka.dispatch.Mailbox.run(Mailbox.scala:231) 
[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0]
   at akka.dispatch.Mailbox.exec(Mailbox.scala:243) 
[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0]
   at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
[?:1.8.0_292]
   at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
[?:1.8.0_292]
   at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
[?:1.8.0_292]
   at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) 
[?:1.8.0_292]
Caused by: org.apache.flink.runtime.jobmaster.JobMasterException: Could not 
start the JobMaster.
   at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:391) 
~[flink-dist_2.11-1.14.0.jar:1.14.0]
   at 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:624)
 ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0]
   at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0]
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:623)
 ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0]
   ... 20 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the 
operator coordinators
   at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:90)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
   at 
org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:584)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
   at 
org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:965)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
   at 
org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:882)
 

[jira] [Created] (FLINK-24359) Migrate FileSystem connector to ResolvedSchema

2021-09-23 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24359:
---

 Summary: Migrate FileSystem connector to ResolvedSchema
 Key: FLINK-24359
 URL: https://issues.apache.org/jira/browse/FLINK-24359
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Ecosystem
 Environment: Flink 1.14-SNAPSHOT
Reporter: Francesco Guardiani


Filesystem connector uses the TableSchema deprecated APIs. This causes issues 
with Table APIs, because TableSchema#fromResolvedSchema(ResolvedSchema) 
requires the expressions to be serializable strings 
(ResolvedExpression#asSerializableString).

For example:
{code:java}
TableDescriptor inputTable = TableDescriptor.forConnector("filesystem")
.schema(
Schema.newBuilder()
.column("character", DataTypes.STRING())
.column("latitude", DataTypes.STRING())
.column("longitude", DataTypes.STRING())
.column("time", DataTypes.TIMESTAMP(3))
.watermark("time", $("time").minus(lit(2).seconds()))
.build()
)
// Other options
.build();
{code}

When used in a table pipeline, throws the following exception:


{code:java}
Caused by: org.apache.flink.table.api.TableException: Expression 'minus(time, 
2000)' is not string serializable. Currently, only expressions that originated 
from a SQL expression have a well-defined string representation.
at 
org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51)
at 
org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455)
at 
java.base/java.util.Collections$SingletonList.forEach(Collections.java:4976)
at 
org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451)
at 
org.apache.flink.table.catalog.ResolvedCatalogBaseTable.getSchema(ResolvedCatalogBaseTable.java:54)
at 
org.apache.flink.table.filesystem.AbstractFileSystemTable.(AbstractFileSystemTable.java:52)
at 
org.apache.flink.table.filesystem.FileSystemTableSource.(FileSystemTableSource.java:91)
at 
org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:74)
at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:145)
{code}

The same table definition using SQL works fine:
{code:java}
CREATE TABLE IF NOT EXISTS LocationEvents (
`character` STRING,
`latitude` STRING,
`longitude` STRING,
`time` TIMESTAMP(3),
WATERMARK FOR `time` AS `time` - INTERVAL '5' MINUTES
) WITH (
-- Load from filesystem
'connector' = 'filesystem',
--- Other configs
);
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24225) Define and implement UPSERT INTO semantics

2021-09-09 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24225:
---

 Summary: Define and implement UPSERT INTO semantics
 Key: FLINK-24225
 URL: https://issues.apache.org/jira/browse/FLINK-24225
 Project: Flink
  Issue Type: Bug
Reporter: Francesco Guardiani


In the https://issues.apache.org/jira/browse/FLINK-22942, UPSERT INTO was 
disabled. We should define the correct behaviour for UPSERT INTO and implement 
it



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24214) A submit job failure crashes the sql client

2021-09-08 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24214:
---

 Summary: A submit job failure crashes the sql client
 Key: FLINK-24214
 URL: https://issues.apache.org/jira/browse/FLINK-24214
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.13.2
 Environment: Flink 1.13.2
Ubuntu 21.04
Java 8
Reporter: Francesco Guardiani


I've noticed that when executing a valid query, in case there is a "bad" error 
when submitting it to the flink cluster, the client is going to crash, with a 
misleading beginning of the stacktrace. For example:


{code:java}
Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190)
Caused by: java.lang.RuntimeException: Error running SQL job.
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:606)
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
at 
org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:537)
at 
org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299)
at java.util.Optional.ifPresent(Optional.java:159)
at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200)
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:603)
... 8 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal 
server error., (JobManagerRunnerImpl.java:152)
at 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379)
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot 
initialize task 'Source: KafkaTableSource(o_orderkey, o_custkey, o_orderstatus, 
o_totalprice, o_currency, o_ordertime, o_orderpriority, o_clerk, 
o_shippriority, o_comment) -> 
SourceConversion(table=[default_catalog.default_database.prod_orders, source: 
[KafkaTableSource(o_orderkey, o_custkey, o_orderstatus, o_totalprice, 
o_currency, o_ordertime, o_orderpriority, o_clerk, o_shippriority, 
o_comment)]], fields=[o_orderkey, o_custkey, o_orderstatus, o_totalprice, 
o_currency, o_ordertime, o_orderpriority, o_clerk, o_shippriority, o_comment]) 
-> Timestamps/Watermarks -> SinkConversionToRow -> Map -> Sink: 
CsvTableSink(o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_currency, 

[jira] [Created] (FLINK-24170) Building a fresh clone with latest Maven fails

2021-09-06 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24170:
---

 Summary: Building a fresh clone with latest Maven fails
 Key: FLINK-24170
 URL: https://issues.apache.org/jira/browse/FLINK-24170
 Project: Flink
  Issue Type: Bug
  Components: Build System
 Environment: Maven: 3.8.2

JDK: 8

OS: Ubuntu 21.04
Reporter: Francesco Guardiani


Building with the latest Maven fails during the {{highest-dir}} goal:

 
{quote}{{Cannot find a single highest directory for this project set. First two 
candidates directories don't share a common root. }}
{quote}
 

As suggested on this SO answer, the {{directory-of}} goal works better



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >