[jira] [Created] (FLINK-27201) Remove CollectTableSink
Francesco Guardiani created FLINK-27201: --- Summary: Remove CollectTableSink Key: FLINK-27201 URL: https://issues.apache.org/jira/browse/FLINK-27201 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani Assignee: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-27200) Replace CollectionTableSource and CollectionTableSink with new table stack alternatives
Francesco Guardiani created FLINK-27200: --- Summary: Replace CollectionTableSource and CollectionTableSink with new table stack alternatives Key: FLINK-27200 URL: https://issues.apache.org/jira/browse/FLINK-27200 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani We need to replace the old CollectionTableSource and CollectionTableSink with alternatives from the new stack, for example TestValuesTableFactory or some simpler and more straightforward alternative coming from TableFactoryHarness. This task requires to identify the alternative for the old CollectionTableSource and CollectionTableSink, eventually creating a new one, and then replace the usage in tests. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-27198) Cleanup tests testing legacy TableSink/TableSource
Francesco Guardiani created FLINK-27198: --- Summary: Cleanup tests testing legacy TableSink/TableSource Key: FLINK-27198 URL: https://issues.apache.org/jira/browse/FLINK-27198 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani We have a lot of mock TableSink/TableSource used only for testing features of the old TableSink/TableSource stack. We should identify them and remove altogether with the tests. A non complete list: {{Implementations of TableSink in (25 usages found)}} {{ Test (14 usages found)}} {{ flink-python_2.12 (4 usages found)}} {{ org.apache.flink.table.legacyutils (4 usages found)}} {{ legacyTestingSinks.scala (3 usages found)}} {{ 39 private[flink] class TestAppendSink extends AppendStreamTableSink[Row] {}} {{ 69 private[flink] class TestRetractSink extends RetractStreamTableSink[Row] {}} {{ 95 private[flink] class TestUpsertSink(}} {{ TestCollectionTableFactory.scala (1 usage found)}} {{ 168 class CollectionTableSink(val outputType: RowTypeInfo)}} {{ flink-table-common (1 usage found)}} {{ org.apache.flink.table.utils (1 usage found)}} {{ TypeMappingUtilsTest.java (1 usage found)}} {{ 419 private static class TestTableSink implements TableSink> {}} {{ flink-table-planner_2.12 (9 usages found)}} {{ org.apache.flink.table.planner.factories.utils (1 usage found)}} {{ TestCollectionTableFactory.scala (1 usage found)}} {{ 152 class CollectionTableSink(val schema: TableSchema)}} {{ org.apache.flink.table.planner.runtime.batch.sql (1 usage found)}} {{ PartitionableSinkITCase.scala (1 usage found)}} {{ 331 private class TestSink(}} {{ org.apache.flink.table.planner.runtime.utils (3 usages found)}} {{ StreamTestSink.scala (3 usages found)}} {{ 265 final class TestingUpsertTableSink(val keys: Array[Int], val tz: TimeZone)}} {{ 344 final class TestingAppendTableSink(tz: TimeZone) extends AppendStreamTableSink[Row] {}} {{ 499 final class TestingRetractTableSink(tz: TimeZone) extends RetractStreamTableSink[Row] {}} {{ org.apache.flink.table.planner.utils (4 usages found)}} {{ MemoryTableSourceSinkUtil.scala (3 usages found)}} {{ 80 final class UnsafeMemoryAppendTableSink}} {{ 146 final class DataTypeOutputFormatTableSink(}} {{ 181 final class DataTypeAppendStreamTableSink(}} {{ testTableSourceSinks.scala (1 usage found)}} {{ 1317 class OptionsTableSink(}} {{Implementations of TableSource in (31 usages found)}} {{ Production (8 usages found)}} {{ flink-batch-sql-test (1 usage found)}} {{ org.apache.flink.sql.tests (1 usage found)}} {{ BatchSQLTestProgram.java (1 usage found)}} {{ 79 public static class GeneratorTableSource extends InputFormatTableSource {}} {{ flink-stream-sql-test_2.12 (1 usage found)}} {{ org.apache.flink.sql.tests (1 usage found)}} {{ StreamSQLTestProgram.java (1 usage found)}} {{ 192 public static class GeneratorTableSource}} {{ Test (23 usages found)}} {{ flink-python_2.12 (1 usage found)}} {{ org.apache.flink.table.legacyutils (1 usage found)}} {{ TestCollectionTableFactory.scala (1 usage found)}} {{ 127 class CollectionTableSource(}} {{ flink-sql-client_2.12 (1 usage found)}} {{ org.apache.flink.table.client.gateway.utils (1 usage found)}} {{ SimpleCatalogFactory.java (1 usage found)}} {{ 95 new StreamTableSource() {}} {{ flink-table-api-java (1 usage found)}} {{ org.apache.flink.table.utils (1 usage found)}} {{ TableSourceMock.java (1 usage found)}} {{ 27 public class TableSourceMock implements TableSource {}} {{ flink-table-common (1 usage found)}} {{ org.apache.flink.table.utils (1 usage found)}} {{ TypeMappingUtilsTest.java (1 usage found)}} {{ 375 private static class TestTableSource}} {{ flink-table-planner_2.12 (19 usages found)}} {{ org.apache.flink.table.planner.factories.utils (1 usage found)}} {{ TestCollectionTableFactory.scala (1 usage found)}} {{ 115 class CollectionTableSource(}} {{ org.apache.flink.table.planner.plan.stream.sql.join (2 usages found)}} {{ LookupJoinTest.scala (2 usages found)}} {{
[jira] [Created] (FLINK-27197) Port ArrowTableSource and PythonInputFormatTableSource to DynamicTableSource
Francesco Guardiani created FLINK-27197: --- Summary: Port ArrowTableSource and PythonInputFormatTableSource to DynamicTableSource Key: FLINK-27197 URL: https://issues.apache.org/jira/browse/FLINK-27197 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-27196) Remove old format interfaces
Francesco Guardiani created FLINK-27196: --- Summary: Remove old format interfaces Key: FLINK-27196 URL: https://issues.apache.org/jira/browse/FLINK-27196 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Francesco Guardiani Assignee: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-27185) Run the assertj conversion script to convert assertions in connectors and formats
Francesco Guardiani created FLINK-27185: --- Summary: Run the assertj conversion script to convert assertions in connectors and formats Key: FLINK-27185 URL: https://issues.apache.org/jira/browse/FLINK-27185 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani Assignee: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-27043) Remove CSV connector test usages
Francesco Guardiani created FLINK-27043: --- Summary: Remove CSV connector test usages Key: FLINK-27043 URL: https://issues.apache.org/jira/browse/FLINK-27043 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani Assignee: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26988) Better error reporting when format fails
Francesco Guardiani created FLINK-26988: --- Summary: Better error reporting when format fails Key: FLINK-26988 URL: https://issues.apache.org/jira/browse/FLINK-26988 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Reporter: Francesco Guardiani Today when a format fails, depending on the format implementation, there might be no information about the split (file name, offset, etc), making hard to debug format failures. For example, here is what I've seen with csv format: {code:java} Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225) ~[flink-connector-files-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) ~[flink-connector-files-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130) ~[flink-connector-files-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:403) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:531) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:227) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:841) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:767) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at java.lang.Thread.run(Thread.java:829) ~[?:?] Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150) ~[flink-connector-files-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) ~[flink-connector-files-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?] at java.lang.Thread.run(Thread.java:829) ~[?:?] Caused by: java.lang.RuntimeException: Invalid UTF-8 middle byte 0x54 (at char #3529, byte #3528): check content encoding, does not look like UTF-8 at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator._handleIOException(MappingIterator.java:417) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator.hasNext(MappingIterator.java:190) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.formats.csv.CsvReaderFormat$Reader.read(CsvReaderFormat.java:194) ~[flink-csv-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.connector.file.src.impl.StreamFormatAdapter$Reader.readBatch(StreamFormatAdapter.java:214) ~[flink-connector-files-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67) ~[flink-connector-files-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
[jira] [Created] (FLINK-26952) Remove old CSV connector
Francesco Guardiani created FLINK-26952: --- Summary: Remove old CSV connector Key: FLINK-26952 URL: https://issues.apache.org/jira/browse/FLINK-26952 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani Assignee: Francesco Guardiani This involves removing all references in our tests to the old CSV connector (mostly TableEnvironmentITCase, TableEnvironmentTest and TableITCase) and removing the usage from a couple of e2e tests -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26950) Remove TableSink and TableSource interfaces
Francesco Guardiani created FLINK-26950: --- Summary: Remove TableSink and TableSource interfaces Key: FLINK-26950 URL: https://issues.apache.org/jira/browse/FLINK-26950 Project: Flink Issue Type: Technical Debt Reporter: Francesco Guardiani This issue involves removing the old TableSink and TableSource stack, replaced by the new DynamicTableSink and DynamicTableSource. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26782) Remove PlannerExpression and related
Francesco Guardiani created FLINK-26782: --- Summary: Remove PlannerExpression and related Key: FLINK-26782 URL: https://issues.apache.org/jira/browse/FLINK-26782 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani Assignee: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26704) Remove string expression DSL
Francesco Guardiani created FLINK-26704: --- Summary: Remove string expression DSL Key: FLINK-26704 URL: https://issues.apache.org/jira/browse/FLINK-26704 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani Assignee: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26582) Run the assertj conversion script to convert assertions in flink-table
Francesco Guardiani created FLINK-26582: --- Summary: Run the assertj conversion script to convert assertions in flink-table Key: FLINK-26582 URL: https://issues.apache.org/jira/browse/FLINK-26582 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani Assignee: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26553) Enable scalafmt for scala codebase
Francesco Guardiani created FLINK-26553: --- Summary: Enable scalafmt for scala codebase Key: FLINK-26553 URL: https://issues.apache.org/jira/browse/FLINK-26553 Project: Flink Issue Type: Bug Components: Build System Reporter: Francesco Guardiani Assignee: Francesco Guardiani As discussed in https://lists.apache.org/thread/97398pc9cb8y922xlb6mzlsbjtjf5jnv, we should enable scalafmt in our codebase -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26551) Make the legacy behavior disabled by default
Francesco Guardiani created FLINK-26551: --- Summary: Make the legacy behavior disabled by default Key: FLINK-26551 URL: https://issues.apache.org/jira/browse/FLINK-26551 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani Followup of https://issues.apache.org/jira/browse/FLINK-25111 For the discussion, see https://lists.apache.org/thread/r13y3plwwyg3sngh8cz47flogq621txv -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26549) INSERT INTO with VALUES leads to wrong type inference with nested types
Francesco Guardiani created FLINK-26549: --- Summary: INSERT INTO with VALUES leads to wrong type inference with nested types Key: FLINK-26549 URL: https://issues.apache.org/jira/browse/FLINK-26549 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Francesco Guardiani While working on casting, I've found out we have an interesting bug in the insert values type inference. This comes from the {{KafkaTableITCase#testKafkaSourceSinkWithMetadata}} (look at this version in particular https://github.com/apache/flink/blob/567440115bcacb5aceaf3304e486281c7da8c14f/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java). The test scenario is an INSERT INTO VALUES query which is also pushing some metadata to a Kafka table, in particular is writing the headers metadata. The table is declared like that: {code:sql} CREATE TABLE kafka ( `physical_1` STRING, `physical_2` INT, `timestamp-type` STRING METADATA VIRTUAL, `timestamp` TIMESTAMP(3) METADATA, `leader-epoch` INT METADATA VIRTUAL, `headers` MAP METADATA, `partition` INT METADATA VIRTUAL, `topic` STRING METADATA VIRTUAL, `physical_3` BOOLEAN ) WITH ( 'connector' = 'kafka', [...] ) {code} The insert into query looks like: {code:sql} INSERT INTO kafka VALUES ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', x'C0FFEE', 'k2', x'BABE'], TRUE), ('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP), FALSE), ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2', X'20'], TRUE) {code} Note that in the first row, the byte literal is of length 3, while in the last row the byte literal is of length 1. The generated plan of this INSERT INTO is: {code} == Abstract Syntax Tree == LogicalSink(table=[default_catalog.default_database.kafka], fields=[physical_1, physical_2, physical_3, headers, timestamp]) +- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], headers=[CAST($3):(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP], timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)]) +- LogicalUnion(all=[true]) :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true]) : +- LogicalValues(tuples=[[{ 0 }]]) :- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP], EXPR$4=[false]) : +- LogicalValues(tuples=[[{ 0 }]]) +- LogicalProject(EXPR$0=[_UTF-16LE'data 3'], EXPR$1=[3], EXPR$2=[2020-03-10 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1))], EXPR$4=[true]) +- LogicalValues(tuples=[[{ 0 }]]) == Optimized Physical Plan == Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, physical_2, physical_3, headers, timestamp]) +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, timestamp]) :- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS physical_3, CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT NULL, BINARY(1) NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp]) : +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]]) :- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp]) : +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]]) +- Calc(select=[_UTF-16LE'data 3' AS physical_1, 3 AS physical_2, true AS physical_3, CAST(MAP(_UTF-16LE'k1', X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-10 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp]) +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]]) == Optimized Execution Plan == Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, physical_2, physical_3, headers, timestamp]) +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, timestamp]) :- Calc(select=['data 1' AS physical_1, 1 AS physical_2, true AS physical_3,
[jira] [Created] (FLINK-26520) Implement SEARCH operator
Francesco Guardiani created FLINK-26520: --- Summary: Implement SEARCH operator Key: FLINK-26520 URL: https://issues.apache.org/jira/browse/FLINK-26520 Project: Flink Issue Type: Technical Debt Components: Table SQL / Planner Reporter: Francesco Guardiani Assignee: Francesco Guardiani The codegen right now is not implementing the SEARCH operator, but it's using the rex builder to circumvent it. We should implement the SEARCH operator directly, to remove the usage of the flink type factory -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26519) Remove FlinkTypeFactory.INSTANCE singleton
Francesco Guardiani created FLINK-26519: --- Summary: Remove FlinkTypeFactory.INSTANCE singleton Key: FLINK-26519 URL: https://issues.apache.org/jira/browse/FLINK-26519 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Francesco Guardiani Assignee: Francesco Guardiani This singleton is not correct, as the user classloader needs to be passed for structured types creation -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26463) TableEnvironmentITCase should use MiniCluster
Francesco Guardiani created FLINK-26463: --- Summary: TableEnvironmentITCase should use MiniCluster Key: FLINK-26463 URL: https://issues.apache.org/jira/browse/FLINK-26463 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Francesco Guardiani Assignee: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26422) Update Chinese documentation with the new TablePipeline docs
Francesco Guardiani created FLINK-26422: --- Summary: Update Chinese documentation with the new TablePipeline docs Key: FLINK-26422 URL: https://issues.apache.org/jira/browse/FLINK-26422 Project: Flink Issue Type: Bug Components: Documentation Reporter: Francesco Guardiani Chinese docs needs to be updated with the content of this commit: https://github.com/apache/flink/commit/4f65c7950f2c3ef849f2094deab0e199ffedf57b -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26280) Add a flag to disable uid generation
Francesco Guardiani created FLINK-26280: --- Summary: Add a flag to disable uid generation Key: FLINK-26280 URL: https://issues.apache.org/jira/browse/FLINK-26280 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Francesco Guardiani We should add a flag to disable uid generation for back-compat. See the discussion here: https://issues.apache.org/jira/browse/FLINK-25932 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26252) Refactor MiniClusterExtension
Francesco Guardiani created FLINK-26252: --- Summary: Refactor MiniClusterExtension Key: FLINK-26252 URL: https://issues.apache.org/jira/browse/FLINK-26252 Project: Flink Issue Type: Bug Components: Test Infrastructure Reporter: Francesco Guardiani Assignee: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26249) Run BuiltInFunctionITCase tests in parallel
Francesco Guardiani created FLINK-26249: --- Summary: Run BuiltInFunctionITCase tests in parallel Key: FLINK-26249 URL: https://issues.apache.org/jira/browse/FLINK-26249 Project: Flink Issue Type: Bug Components: Table SQL / Planner, Test Infrastructure Reporter: Francesco Guardiani Assignee: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26131) CompiledPlan should implement Executable
Francesco Guardiani created FLINK-26131: --- Summary: CompiledPlan should implement Executable Key: FLINK-26131 URL: https://issues.apache.org/jira/browse/FLINK-26131 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Francesco Guardiani In order to do that, we need to keep a reference of TableEnvironment within the implementation of CompiledPlan. We should also check that the TableEnvironment matches -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26128) Improve Table, Expressions and related classes Javadocs/Scaladocs
Francesco Guardiani created FLINK-26128: --- Summary: Improve Table, Expressions and related classes Javadocs/Scaladocs Key: FLINK-26128 URL: https://issues.apache.org/jira/browse/FLINK-26128 Project: Flink Issue Type: Bug Components: Documentation, Table SQL / API Reporter: Francesco Guardiani Right now we have inconsistent Javadocs and Scaladocs in Table and Expressions. Each of them has a different format, and some javadocs are even mentioning scala examples. We should properly reword them to be consistent, remove scala examples when not necessary, expand java examples. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26127) Cleanup usage of deprecated table methods in doc
Francesco Guardiani created FLINK-26127: --- Summary: Cleanup usage of deprecated table methods in doc Key: FLINK-26127 URL: https://issues.apache.org/jira/browse/FLINK-26127 Project: Flink Issue Type: Bug Components: Documentation, Table SQL / API Reporter: Francesco Guardiani Assignee: Francesco Guardiani In our docs we have a lot of deprecated methods usage in various examples. We should clean them up, and replace with new methods usage -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26125) Doc overhaul for the CAST behaviour
Francesco Guardiani created FLINK-26125: --- Summary: Doc overhaul for the CAST behaviour Key: FLINK-26125 URL: https://issues.apache.org/jira/browse/FLINK-26125 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani This includes: * Proper documentation of the new TRY_CAST * Add a CAST matrix to document which CAST tuples are supported -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26090) Remove pre FLIP-84 methods
Francesco Guardiani created FLINK-26090: --- Summary: Remove pre FLIP-84 methods Key: FLINK-26090 URL: https://issues.apache.org/jira/browse/FLINK-26090 Project: Flink Issue Type: Bug Reporter: Francesco Guardiani Assignee: Francesco Guardiani See https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878 and https://issues.apache.org/jira/browse/FLINK-16364. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26089) Introduce TablePipeline
Francesco Guardiani created FLINK-26089: --- Summary: Introduce TablePipeline Key: FLINK-26089 URL: https://issues.apache.org/jira/browse/FLINK-26089 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani Assignee: Francesco Guardiani See https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489#FLIP190:SupportVersionUpgradesforTableAPI -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26071) TableEnvironment#compilePlan should fail
Francesco Guardiani created FLINK-26071: --- Summary: TableEnvironment#compilePlan should fail Key: FLINK-26071 URL: https://issues.apache.org/jira/browse/FLINK-26071 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Francesco Guardiani Assignee: Francesco Guardiani FLINK-25841 introduced {{compilePlan}} in {{TableEnvironment}}. This API should fail, rather than failing later in {{writeToFile}}. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26060) Make Python specific exec nodes unsupported
Francesco Guardiani created FLINK-26060: --- Summary: Make Python specific exec nodes unsupported Key: FLINK-26060 URL: https://issues.apache.org/jira/browse/FLINK-26060 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani Assignee: Francesco Guardiani These nodes are using the old type system, which is going to be removed soon. We should avoid supporting them in the persisted plan, as we cannot commit to support them. Once migrated to the new type system, PyFlink won't need these nodes anymore and will just rely on Table new function stack. For more details, also check https://issues.apache.org/jira/browse/FLINK-25231 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26054) Enable maven-enforcer to disable table-planner and table-runtime as dependencies
Francesco Guardiani created FLINK-26054: --- Summary: Enable maven-enforcer to disable table-planner and table-runtime as dependencies Key: FLINK-26054 URL: https://issues.apache.org/jira/browse/FLINK-26054 Project: Flink Issue Type: Bug Components: Build System, Table SQL / Planner Reporter: Francesco Guardiani Assignee: Francesco Guardiani >From https://github.com/apache/flink/pull/18676#discussion_r802502438, we >should enable the enforcer for table-planner and table-runtime modules, as >described in the flink-table/README.md -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26053) Fix parser generator warnings
Francesco Guardiani created FLINK-26053: --- Summary: Fix parser generator warnings Key: FLINK-26053 URL: https://issues.apache.org/jira/browse/FLINK-26053 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: Francesco Guardiani When building flink-sql-parser, the javacc logs a couple of warnings: {code} Warning: Choice conflict in [...] construct at line 2920, column 5. Expansion nested within construct and expansion following construct have common prefixes, one of which is: "PLAN" Consider using a lookahead of 2 or more for nested expansion. Warning: Choice conflict involving two expansions at line 2930, column 9 and line 2932, column 9 respectively. A common prefix is: "STATEMENT" Consider using a lookahead of 2 for earlier expansion. Warning: Choice conflict involving two expansions at line 2952, column 9 and line 2954, column 9 respectively. A common prefix is: "STATEMENT" Consider using a lookahead of 2 for earlier expansion. {code} Took from https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30871=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=3911 We should investigate them, as they're often symptom of some bug in our parser template, which might result in unexpected parsing errors. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25986) Add FLIP-190 new API methods to python
Francesco Guardiani created FLINK-25986: --- Summary: Add FLIP-190 new API methods to python Key: FLINK-25986 URL: https://issues.apache.org/jira/browse/FLINK-25986 Project: Flink Issue Type: Bug Components: API / Python, Table SQL / API Reporter: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25942) Use jackson jdk8/time modules for Duration ser/de
Francesco Guardiani created FLINK-25942: --- Summary: Use jackson jdk8/time modules for Duration ser/de Key: FLINK-25942 URL: https://issues.apache.org/jira/browse/FLINK-25942 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani Assignee: Francesco Guardiani https://issues.apache.org/jira/browse/FLINK-25588 introduced jackson jdk8 and datetime modules to flink-shaded jackson, so now we don't need our Duration ser/de anymore (which was introduced because we lacked this jackson module). -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25918) Use FileEnumerator to implement filter pushdown of filepath metadata
Francesco Guardiani created FLINK-25918: --- Summary: Use FileEnumerator to implement filter pushdown of filepath metadata Key: FLINK-25918 URL: https://issues.apache.org/jira/browse/FLINK-25918 Project: Flink Issue Type: Improvement Components: Connectors / FileSystem Reporter: Francesco Guardiani Right now, unless you configure partition keys, the table file source will ingest all the files in the provided {{path}}. Which means that a query like: {code:sql} SELECT * FROM MyFileTable WHERE filepath LIKE "%.csv" {code} Will ingest all the files and then, after the records are loaded in flink, the filtering happens and discards all the records not coming from a file with pattern "%.csv". Using the filter push down feature provided by the DynamicTableSource stack, we could instead provide the {{FileSourceBuilder}} directly a {{FileEnumerator}} that does the filtering of input files, so we can effectively skip reading them. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25897) Update project configuration gradle doc to 7.x version
Francesco Guardiani created FLINK-25897: --- Summary: Update project configuration gradle doc to 7.x version Key: FLINK-25897 URL: https://issues.apache.org/jira/browse/FLINK-25897 Project: Flink Issue Type: Bug Reporter: Francesco Guardiani Assignee: Mario Rincon-Nigro Update the gradle build script and its doc page to 7.x -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25895) Add ExecNodeGraph ser/de
Francesco Guardiani created FLINK-25895: --- Summary: Add ExecNodeGraph ser/de Key: FLINK-25895 URL: https://issues.apache.org/jira/browse/FLINK-25895 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani Assignee: Francesco Guardiani Right now we have an intermediate model called {{JsonPlanGraph}} to ser/de the ExecNodeGraph. This model is fine, but we should do the conversion through jackson rather than manually ser/de it with {{ExecNodeGraphJsonPlanGenerator}}. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25809) Introduce test infra for building FLIP-190 tests
Francesco Guardiani created FLINK-25809: --- Summary: Introduce test infra for building FLIP-190 tests Key: FLINK-25809 URL: https://issues.apache.org/jira/browse/FLINK-25809 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani Assignee: Francesco Guardiani The FLIP-190 requires to build a new test infra. For this test infra, we want to define test cases and data once, and then for each case we want to execute the following: * Integration test that roughly does {{create plan -> execute job -> trigger savepoint -> stop job -> restore plan -> restore savepoint -> execute job -> stop and assert}}. Plan and savepoint should be commited to git, so running this tests when a plan and savepoint is available will not regenerate plan and savepoint. * Change detection test to check that for the defined test cases, the plan hasn't been changed. Similar to the existing {{JsonPlanITCase}} tests. * Completeness of tests/Coverage, that is count how many times ExecNodes (including versions) are used in the test cases. Fail if an ExecNode version is never covered. Other requirements includes to "version" the test cases, that is for each test case we can retain different versions of the plan and savepoint, in order to make sure that after we introduce a new plan change, the old plan still continues to run -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25791) Make ObjectIdentifier json representation simpler
Francesco Guardiani created FLINK-25791: --- Summary: Make ObjectIdentifier json representation simpler Key: FLINK-25791 URL: https://issues.apache.org/jira/browse/FLINK-25791 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani Assignee: Francesco Guardiani Use {{ObjectIdentifier#asSerializableString}} to serialize the object identifier, rather than serializing it as object. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25779) Define ConfigOption for properties map in Kinesis
Francesco Guardiani created FLINK-25779: --- Summary: Define ConfigOption for properties map in Kinesis Key: FLINK-25779 URL: https://issues.apache.org/jira/browse/FLINK-25779 Project: Flink Issue Type: Bug Components: Connectors / Kafka Reporter: Francesco Guardiani Now there is no {{ConfigOption}} definition of {{properties.*}} in Kafka connector. This breaks option forwarding when restoring the table options from the persisted plan/catalog. We can define it using {{ConfigOptionBuilder#mapType}}. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25778) Define ConfigOption for properties map in Kafka
Francesco Guardiani created FLINK-25778: --- Summary: Define ConfigOption for properties map in Kafka Key: FLINK-25778 URL: https://issues.apache.org/jira/browse/FLINK-25778 Project: Flink Issue Type: Bug Components: Connectors / Kafka Reporter: Francesco Guardiani Now there is no {{ConfigOption}} definition of {{properties.*}} in Kafka connector. This breaks option forwarding when restoring the table options from the persisted plan/catalog. We can define it using {{ConfigOptionBuilder#mapType}}. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25777) Generate documentation for Table factories (formats and connectors)
Francesco Guardiani created FLINK-25777: --- Summary: Generate documentation for Table factories (formats and connectors) Key: FLINK-25777 URL: https://issues.apache.org/jira/browse/FLINK-25777 Project: Flink Issue Type: Technical Debt Components: Connectors / Common, Documentation, Formats (JSON, Avro, Parquet, ORC, SequenceFile) Reporter: Francesco Guardiani The goal of this issue is to generate automatically from code the documentation of configuration options for table connectors and formats. This issue includes: * Tweak {{ConfigOptionsDocGenerator}} to work with {{Factory#requiredOptions}}, {{Factory#requiredOptions}} and newly introduced {{DynamicTableFactory#forwardOptions}} and {{FormatFactory#forwardOptions}}. Also see this https://github.com/apache/flink/pull/18290 as reference. From these methods we should extract if an option is required or not, and if it's forwardable or not. * Decide whether the generator output should be, and how to link/include it in the connector/format docs pages. * Enable the code generator in CI. * Regenerate all the existing docs. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25588) Add jdk8 and datetime module to jackson shaded
Francesco Guardiani created FLINK-25588: --- Summary: Add jdk8 and datetime module to jackson shaded Key: FLINK-25588 URL: https://issues.apache.org/jira/browse/FLINK-25588 Project: Flink Issue Type: Improvement Components: BuildSystem / Shaded Reporter: Francesco Guardiani Assignee: Francesco Guardiani Add modules jdk8 and jsr310 to jackson shaded. https://github.com/FasterXML/jackson-modules-java8 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25526) Deprecate TableSinkFactory, TableSourceFactory and TableFormatFactory
Francesco Guardiani created FLINK-25526: --- Summary: Deprecate TableSinkFactory, TableSourceFactory and TableFormatFactory Key: FLINK-25526 URL: https://issues.apache.org/jira/browse/FLINK-25526 Project: Flink Issue Type: Technical Debt Components: Table SQL / API Reporter: Francesco Guardiani Fix For: 1.15.0 These factories are part of the old type system/sink & source stack and should not be used anymore, as users should work with DynamicTableSink/DynamicTableSource factories instead. This task should deprecate {{TableSinkFactory}}, {{TableSourceFactory}} and {{TableFormatFactory}} and all the related types. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25518) Harden JSON Serialization utilities
Francesco Guardiani created FLINK-25518: --- Summary: Harden JSON Serialization utilities Key: FLINK-25518 URL: https://issues.apache.org/jira/browse/FLINK-25518 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Francesco Guardiani Assignee: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25428) Expose complex types CAST to String
Francesco Guardiani created FLINK-25428: --- Summary: Expose complex types CAST to String Key: FLINK-25428 URL: https://issues.apache.org/jira/browse/FLINK-25428 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Francesco Guardiani Attachments: cast_function_it_case.patch, logical_type_casts.patch Right now we have all the casting rules for collection, structured and raw types to string, that is we have logic to stringify the following types: * ARRAY * MAP * MULTISET * ROW * STRUCTURED * RAW Unfortunately these don't work, for different reasons, notably: * We need to support these combinations in {{LogicalTypeCasts}} (check the attached patch) * For some of them Calcite applies its casting validation logic and marks them as invalid * For MULTISET and STRUCTURED, there are issues specific to Table API and its expression stack, which cannot correctly convert the values to literal You can check all these errors by applying the attached patch to the cast function it cases. We need to fix these issues, so users can use SQL and Table API to cast these values to string. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25300) Remove CEIL and FLOOR call with one argument for integral values
Francesco Guardiani created FLINK-25300: --- Summary: Remove CEIL and FLOOR call with one argument for integral values Key: FLINK-25300 URL: https://issues.apache.org/jira/browse/FLINK-25300 Project: Flink Issue Type: Technical Debt Components: Table SQL / Runtime Reporter: Francesco Guardiani Right now when the user tries to invoke {{FLOOR}} and {{CEIL}} on any integral number, a call to {{Math.floor}}/{{Math.ceil}} is generated, which is noop and possibly might end up in incorrect results. We should rather implement a rule in the planner (or reuse the expression reducer rule) that removes {{FLOOR}} and {{CEIL}} in case its arguments are integral types, that is TINYINT, SMALLINT, INT and BIGINT -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25299) Improve LIKE operator efficiency
Francesco Guardiani created FLINK-25299: --- Summary: Improve LIKE operator efficiency Key: FLINK-25299 URL: https://issues.apache.org/jira/browse/FLINK-25299 Project: Flink Issue Type: Technical Debt Components: Table SQL / Runtime Reporter: Francesco Guardiani Right now LIKE is implemented using regexes (check {{SqlLikeUtils}} provided by https://issues.apache.org/jira/browse/FLINK-25282). We can improve it either removing the regex based implementation and manually implementing the matching code, or we can at least improve it caching the regex in the codegen assigning the pattern to a field of the generated function, as parsing the pattern is usually an expensive operation. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25282) Move runtime dependencies from table-planner to table-runtime
Francesco Guardiani created FLINK-25282: --- Summary: Move runtime dependencies from table-planner to table-runtime Key: FLINK-25282 URL: https://issues.apache.org/jira/browse/FLINK-25282 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani Assignee: Francesco Guardiani There are several runtime dependencies (e.g. functions used in codegen) that are shipped by table-planner and calcite-core. We should move these dependencies to runtime -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25229) Introduce flink-table-api-bridge-common
Francesco Guardiani created FLINK-25229: --- Summary: Introduce flink-table-api-bridge-common Key: FLINK-25229 URL: https://issues.apache.org/jira/browse/FLINK-25229 Project: Flink Issue Type: Bug Components: Table SQL / API, Table SQL / Planner Reporter: Francesco Guardiani This package should deduplicate code from api-java-bridge and api-scala-bridge, notably: * the various operations provided by both {{ScalaDataStreamQueryOperation}} and {{JavaDataStreamQueryOperation}} (which are essentially the same code) * some code in {{StreamTableEnvironmentImpl}} and {{StreamStatementSetImpl}} The end goal is that planner should remove the runtime (not test) dependency on flink-scala-api and flink-scala-api-bridge -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25228) Introduce flink-table-test-utils
Francesco Guardiani created FLINK-25228: --- Summary: Introduce flink-table-test-utils Key: FLINK-25228 URL: https://issues.apache.org/jira/browse/FLINK-25228 Project: Flink Issue Type: Bug Components: Table SQL / API, Table SQL / Ecosystem, Table SQL / Planner Reporter: Francesco Guardiani Introduce a package to ship test utilities for formats, connectors and end users. This package should provide: * Assertions for data types, logical types and internal data structures. * Test cases for formats and connnectors The end goal is to remove the test-jar planner dependency in formats and connectors and replace it with this package, so formats and connectors can then just depend on table-planner-loader. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25158) Fix formatting for true, false and null to uppercase
Francesco Guardiani created FLINK-25158: --- Summary: Fix formatting for true, false and null to uppercase Key: FLINK-25158 URL: https://issues.apache.org/jira/browse/FLINK-25158 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Francesco Guardiani All the cast rules using the constant strings {{true}}, {{false}} and {{null}} should use {{TRUE}}, {{FALSE}} and {{NULL}} instead. This behavior should be enabled only if legacy behavior is not enabled -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25156) DISTINCT is not handled correctly by CastRules
Francesco Guardiani created FLINK-25156: --- Summary: DISTINCT is not handled correctly by CastRules Key: FLINK-25156 URL: https://issues.apache.org/jira/browse/FLINK-25156 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani We need to support DISTINCT types in CastRuleProvider, or with some sort of meta rule, so every rule can support it. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25157) Introduce NULL type to string cast rule
Francesco Guardiani created FLINK-25157: --- Summary: Introduce NULL type to string cast rule Key: FLINK-25157 URL: https://issues.apache.org/jira/browse/FLINK-25157 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25131) Update sql client to ship flink-table-planner-loader instead of flink-table-planner
Francesco Guardiani created FLINK-25131: --- Summary: Update sql client to ship flink-table-planner-loader instead of flink-table-planner Key: FLINK-25131 URL: https://issues.apache.org/jira/browse/FLINK-25131 Project: Flink Issue Type: Sub-task Components: Table SQL / Client Reporter: Francesco Guardiani See the parent task for more details. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25130) Update flink-table-uber to ship flink-table-planner-loader instead of flink-table-planner
Francesco Guardiani created FLINK-25130: --- Summary: Update flink-table-uber to ship flink-table-planner-loader instead of flink-table-planner Key: FLINK-25130 URL: https://issues.apache.org/jira/browse/FLINK-25130 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Francesco Guardiani This should also be tested by the sql client. This change should be tested then by our e2e tests, specifically flink-end-to-end-tests/flink-batch-sql-test. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25129) Update docs and examples to use flink-table-planner-loader instead of flink-table-planner
Francesco Guardiani created FLINK-25129: --- Summary: Update docs and examples to use flink-table-planner-loader instead of flink-table-planner Key: FLINK-25129 URL: https://issues.apache.org/jira/browse/FLINK-25129 Project: Flink Issue Type: Sub-task Components: Documentation, Examples, Table SQL / API Reporter: Francesco Guardiani For more details https://docs.google.com/document/d/12yDUCnvcwU2mODBKTHQ1xhfOq1ujYUrXltiN_rbhT34/edit?usp=sharing -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25128) Introduce flink-table-planner-loader
Francesco Guardiani created FLINK-25128: --- Summary: Introduce flink-table-planner-loader Key: FLINK-25128 URL: https://issues.apache.org/jira/browse/FLINK-25128 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani For more details, see https://docs.google.com/document/d/12yDUCnvcwU2mODBKTHQ1xhfOq1ujYUrXltiN_rbhT34/edit?usp=sharing -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25114) Remove flink-scala dependency from flink-table-runtime
Francesco Guardiani created FLINK-25114: --- Summary: Remove flink-scala dependency from flink-table-runtime Key: FLINK-25114 URL: https://issues.apache.org/jira/browse/FLINK-25114 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani Assignee: Francesco Guardiani flink-scala should not be necessary anymore to flink-table-runtime. We should try to remove it in order to help with the parent task [FLINK-24427|https://issues.apache.org/jira/browse/FLINK-24427] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25113) Cleanup from Parquet and Orc the partition key handling logic
Francesco Guardiani created FLINK-25113: --- Summary: Cleanup from Parquet and Orc the partition key handling logic Key: FLINK-25113 URL: https://issues.apache.org/jira/browse/FLINK-25113 Project: Flink Issue Type: Technical Debt Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Reporter: Francesco Guardiani After https://issues.apache.org/jira/browse/FLINK-24617 the partition key handling logic is encapsuled within {{FileInfoExtractorBulkFormat}}. We should cleanup this logic from orc and parquet formats, in order to simplify it. Note: Hive still depends on this logic, but it should rather use {{FileInfoExtractorBulkFormat}} or similar. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25090) Run the assertj conversion script to convert assertions
Francesco Guardiani created FLINK-25090: --- Summary: Run the assertj conversion script to convert assertions Key: FLINK-25090 URL: https://issues.apache.org/jira/browse/FLINK-25090 Project: Flink Issue Type: Bug Reporter: Francesco Guardiani Assignee: Francesco Guardiani See https://assertj.github.io/doc/#assertj-migration-using-regexes for more details -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25079) Add assertj assertions for table types
Francesco Guardiani created FLINK-25079: --- Summary: Add assertj assertions for table types Key: FLINK-25079 URL: https://issues.apache.org/jira/browse/FLINK-25079 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: Francesco Guardiani Assignee: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25075) Remove reflection to instantiate PlannerExpressionParser
Francesco Guardiani created FLINK-25075: --- Summary: Remove reflection to instantiate PlannerExpressionParser Key: FLINK-25075 URL: https://issues.apache.org/jira/browse/FLINK-25075 Project: Flink Issue Type: Sub-task Components: Table SQL / API, Table SQL / Planner Reporter: Francesco Guardiani Assignee: Francesco Guardiani This reflection leaks the planner module classpath and can cause issues when isolating the classpath -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25061) Add assertj as dependency in flink-parent
Francesco Guardiani created FLINK-25061: --- Summary: Add assertj as dependency in flink-parent Key: FLINK-25061 URL: https://issues.apache.org/jira/browse/FLINK-25061 Project: Flink Issue Type: Bug Components: Tests Reporter: Francesco Guardiani Assignee: Francesco Guardiani We recently discussed test assertion on the ML (https://lists.apache.org/thread/33t7hz8w873p1bc5msppk65792z08rgt), and came to the conclusion to that we want to encourage contributions to use assertj as much as possible. In order to do that, we should add assertj in flink-parent pom as test dependency -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25060) Replace DataType.projectFields with Projection type
Francesco Guardiani created FLINK-25060: --- Summary: Replace DataType.projectFields with Projection type Key: FLINK-25060 URL: https://issues.apache.org/jira/browse/FLINK-25060 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: Francesco Guardiani Assignee: Francesco Guardiani FLINK-24399 introduced new methods to perform data types projections in DataType. Note: no release included such changes. FLINK-24776 introduced a new, more powerful, type to perform operations on projections, that is project types, but also difference and complement. In spite of avoiding to provide different entrypoints for the same functionality, we should cleanup the new methods introduced by FLINK-24399 and replace them with the new Projection type. We should also deprecate the functions in DataTypeUtils. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25052) Port row to row cast logic to CastRule
Francesco Guardiani created FLINK-25052: --- Summary: Port row to row cast logic to CastRule Key: FLINK-25052 URL: https://issues.apache.org/jira/browse/FLINK-25052 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25051) Port raw <-> binary logic to CastRule
Francesco Guardiani created FLINK-25051: --- Summary: Port raw <-> binary logic to CastRule Key: FLINK-25051 URL: https://issues.apache.org/jira/browse/FLINK-25051 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Francesco Guardiani Assignee: Francesco Guardiani More details on the parent task -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24924) TO_TIMESTAMP and TO_DATE should fail
Francesco Guardiani created FLINK-24924: --- Summary: TO_TIMESTAMP and TO_DATE should fail Key: FLINK-24924 URL: https://issues.apache.org/jira/browse/FLINK-24924 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani In a similar fashion to what described https://issues.apache.org/jira/browse/FLINK-24385, TO_TIMESTAMP and TO_DATE should fail instead of returning an error. In particular for these two functions, a failure in parsing could lead to very unexpected behavior, for example it could lead to records with null rowtime. We should change these functions to fail by default when parsing generates an error. We can let users handle errors by letting them use TRY_CAST for the same functionality: {code:sql} -- This fails when input is invalid TO_TIMESTAMP(input) -- Behaves the same as above CAST(input AS TIMESTAMP) -- This returns null when input is invalid TRY_CAST(input AS TIMESTAMP) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24902) Port boolean <-> numeric casting logic to CastRule
Francesco Guardiani created FLINK-24902: --- Summary: Port boolean <-> numeric casting logic to CastRule Key: FLINK-24902 URL: https://issues.apache.org/jira/browse/FLINK-24902 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Francesco Guardiani More details on the parent task -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24847) Decide the overflows behaviour
Francesco Guardiani created FLINK-24847: --- Summary: Decide the overflows behaviour Key: FLINK-24847 URL: https://issues.apache.org/jira/browse/FLINK-24847 Project: Flink Issue Type: Sub-task Components: Table SQL / API, Table SQL / Planner, Table SQL / Runtime Affects Versions: 1.14.0 Reporter: Francesco Guardiani Right now we have inconsistent behavior when it comes down to overflows, depending on whether the value comes from a literal or from a value generated by the runtime (eg after a sum). In particular, I tracked down an issue when trying to execute {{CAST(9.2345682E9):INTEGER}} which returns {{644633299}} instead of {{2147483647}} (the result of {{(int)9234567891.12f}}, because Calcite changes the type of the literal to INTEGER, skipping completely our casting logic in codegen and just forcing us to generate a literal using {{literal.getValue().intValue()}} (Note that Calcite uses {{BigDecimal}} for every numeric, no matter the type). Relevant code related to the issue: * {{RexBuilder#makeCast}} * {{GenerateUtils#generateLiteral}} This issue brings me to the following questions: * Should we throw an error on overflows? ** If yes, should this be the default behavior or just something we configure behind a flag? ** If no, should we have consistent and useful results when overflows (e.g. max value)? *** If yes, what should be those overflow values? *** If no, do we keep everything as it is and document that the user needs to be careful about overflows? -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24843) DynamicTableFactory.Context.getCatalogTable().getPartitionKeys() should return indexes
Francesco Guardiani created FLINK-24843: --- Summary: DynamicTableFactory.Context.getCatalogTable().getPartitionKeys() should return indexes Key: FLINK-24843 URL: https://issues.apache.org/jira/browse/FLINK-24843 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Francesco Guardiani Right now invoking {{context.getCatalogTable().getPartitionKeys()}} returns field names. We should encourage users to use indexes, by having a new method in context or by adding a new method to CatalogTable that returns {{int[]}} and can be used in conjunction with {{DataType.excludeFields}} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24781) Port from string casting logic to CastRule
Francesco Guardiani created FLINK-24781: --- Summary: Port from string casting logic to CastRule Key: FLINK-24781 URL: https://issues.apache.org/jira/browse/FLINK-24781 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24780) Port time to time casting logic to CastRule
Francesco Guardiani created FLINK-24780: --- Summary: Port time to time casting logic to CastRule Key: FLINK-24780 URL: https://issues.apache.org/jira/browse/FLINK-24780 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Francesco Guardiani Port various timestamp related cast logic (timestamp to timestamp_ltz, timestamp_ltz to timestamp, timestamp to timestamp and timestamp_ltz to timestamp_ltz) to the new CastRule stack -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24779) Port Numeric casting logic to CastRule
Francesco Guardiani created FLINK-24779: --- Summary: Port Numeric casting logic to CastRule Key: FLINK-24779 URL: https://issues.apache.org/jira/browse/FLINK-24779 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Francesco Guardiani Assignee: Francesco Guardiani For more details, check the parent issue -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24778) Add DataTypes#ROW(List)
Francesco Guardiani created FLINK-24778: --- Summary: Add DataTypes#ROW(List) Key: FLINK-24778 URL: https://issues.apache.org/jira/browse/FLINK-24778 Project: Flink Issue Type: Bug Components: Table SQL / API, Table SQL / Ecosystem Reporter: Francesco Guardiani Assignee: Francesco Guardiani This method helps to reduce friction when using Stream of fields, because it can be used for {{Collectors.collectingAndThen(Collectors.toList, DataTypes::ROW)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24776) Clarify DecodingFormat
Francesco Guardiani created FLINK-24776: --- Summary: Clarify DecodingFormat Key: FLINK-24776 URL: https://issues.apache.org/jira/browse/FLINK-24776 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Francesco Guardiani Today the {{org.apache.flink.table.connector.format.DecodingFormat}} interface has not clear requirements and it's confusing for implementers. In particular, it's unclear whether the format need to support projection push down or not, and whether the {{DataType}} provided to {{createRuntimeDecoder}} is projected and includes partition keys or not. An example of such misunderstanding is shown here: https://github.com/apache/flink/blob/991dd0466ff28995a22ded0727ef2a1706d9bddc/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java#L107 The PR https://github.com/apache/flink/pull/17544 partially addresses the issue, because it removes the need from BulkFormat implementations to take care of partition keys handling. Neverthless, it's still unclear whether formats support projections or not and if they support nested projections. We should refactor {{DecodingFormat}} as follows: * We document that every {{DecodingFormat}} *MUST* support projections. This is already the case for every format we have (see https://github.com/apache/flink/pull/17544#issuecomment-953184692). A {{DecodingFormat}} *MAY* also support nested projections, and this is signaled by a new method {{DecodingFormat#supportsNestedProjection()}} * Add a new method {{createRuntimeDecoder(DynamicTableSource.Context context, DataType physicalDataType, int[][] projections)}} that users should now implement. The {{physicalDataType}} in this signature is the physical data type from the table schema stripped of metadata columns and partition keys, with fields in the order defined by the table schema. The user can compute the final type with {{DataType.projectFields(physicalDataType, projections)}} * Deprecate the old {{createRuntimeDecoder}} * Default implement the new and old {{createRuntimeDecoder}} to ensure backward compatibility. As alternative, we ([~twalthr] and I) explored the idea that formats might not support projection push down, but this is very unlikely and such change requires several planner changes, including breaking the interface {{SupportsProjectionPushDown}}. We should also provide a {{RowData}} implementation that takes care of projection internally, so the {{DecodingFormat}} implementer that doesn't want to support projections can just use this {{RowData}} wrapper like: {{new ProjectedRowData(rowDataProducedByFormat, projections)}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24752) Cleanup ScalarOperatorGens#generateCast
Francesco Guardiani created FLINK-24752: --- Summary: Cleanup ScalarOperatorGens#generateCast Key: FLINK-24752 URL: https://issues.apache.org/jira/browse/FLINK-24752 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Francesco Guardiani https://issues.apache.org/jira/browse/FLINK-24462 introduced a new stack to implement cast rules. We need to port all our casting logic to the new stack. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24747) Add producedDataType to SupportsProjectionPushDown.applyProjection
Francesco Guardiani created FLINK-24747: --- Summary: Add producedDataType to SupportsProjectionPushDown.applyProjection Key: FLINK-24747 URL: https://issues.apache.org/jira/browse/FLINK-24747 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Francesco Guardiani {{SupportsProjectionPushDown.applyProjection}} should provide the {{producedDataType}} for consistency with {{SupportsReadingMetadata.applyReadableMetadata}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24685) Use the new casting rules in TableResult#print
Francesco Guardiani created FLINK-24685: --- Summary: Use the new casting rules in TableResult#print Key: FLINK-24685 URL: https://issues.apache.org/jira/browse/FLINK-24685 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Francesco Guardiani Assignee: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24684) Port to string casting rules to the new CastRule interface
Francesco Guardiani created FLINK-24684: --- Summary: Port to string casting rules to the new CastRule interface Key: FLINK-24684 URL: https://issues.apache.org/jira/browse/FLINK-24684 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Francesco Guardiani Assignee: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24673) Deprecate old Row SerializationSchema/DeserializationSchema
Francesco Guardiani created FLINK-24673: --- Summary: Deprecate old Row SerializationSchema/DeserializationSchema Key: FLINK-24673 URL: https://issues.apache.org/jira/browse/FLINK-24673 Project: Flink Issue Type: Bug Components: Table SQL / Ecosystem Reporter: Francesco Guardiani Assignee: Francesco Guardiani As discussed in the [mailing list|https://lists.apache.org/thread.html/red2f04ac782f2cd156a639a44c9962b7b92659b76e5ff683de664534%40%3Cdev.flink.apache.org%3E], we should deprecate in the next version: - org.apache.flink.formats.json.JsonRowSerializationSchema - org.apache.flink.formats.json.JsonRowDeserializationSchema - org.apache.flink.formats.avro.AvroRowSerializationSchema - org.apache.flink.formats.avro.AvroRowDeserializationSchema - org.apache.flink.formats.csv.CsvRowDeserializationSchema - org.apache.flink.formats.csv.CsvRowSerializationSchema -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24617) Support partition keys through metadata
Francesco Guardiani created FLINK-24617: --- Summary: Support partition keys through metadata Key: FLINK-24617 URL: https://issues.apache.org/jira/browse/FLINK-24617 Project: Flink Issue Type: Sub-task Components: Table SQL / Runtime Reporter: Francesco Guardiani Assignee: Francesco Guardiani Right now we have a lot of code around the various formats to support partition keys. From the FileSystemTableSource point of view, these can be handled reusing the metadata support provided by https://issues.apache.org/jira/browse/FLINK-24165. We should cleanup all that partition keys support code and just reuse what we have for metadata. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24616) Expose all metadata in FileStatus
Francesco Guardiani created FLINK-24616: --- Summary: Expose all metadata in FileStatus Key: FLINK-24616 URL: https://issues.apache.org/jira/browse/FLINK-24616 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani FileStatus provides several useful metadata, including access time, creation time, etc. We should expose them through the support for metadata in filesystem connector added by https://issues.apache.org/jira/browse/FLINK-24165 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24615) Add infrastructure to support metadata in filesystem connector
Francesco Guardiani created FLINK-24615: --- Summary: Add infrastructure to support metadata in filesystem connector Key: FLINK-24615 URL: https://issues.apache.org/jira/browse/FLINK-24615 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani {{FileSystemTableSource}} should implement {{SupportReadingMetadata}}, and we need a way to propagate file system metadata without adding the code to every format -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24575) Port TestCsvFileSystemFormatFactory to DeserializationSchema
Francesco Guardiani created FLINK-24575: --- Summary: Port TestCsvFileSystemFormatFactory to DeserializationSchema Key: FLINK-24575 URL: https://issues.apache.org/jira/browse/FLINK-24575 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24566) Remove CsvFileSystemFormatFactory
Francesco Guardiani created FLINK-24566: --- Summary: Remove CsvFileSystemFormatFactory Key: FLINK-24566 URL: https://issues.apache.org/jira/browse/FLINK-24566 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani The DeserializationSchema implementation we have should be enough -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24565) Port Avro FileSystemFormatFactory to StreamFormat
Francesco Guardiani created FLINK-24565: --- Summary: Port Avro FileSystemFormatFactory to StreamFormat Key: FLINK-24565 URL: https://issues.apache.org/jira/browse/FLINK-24565 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani Assignee: Caizhi Weng -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24544) Failure when using Kafka connector in Table API with Avro and Confluent schema registry
Francesco Guardiani created FLINK-24544: --- Summary: Failure when using Kafka connector in Table API with Avro and Confluent schema registry Key: FLINK-24544 URL: https://issues.apache.org/jira/browse/FLINK-24544 Project: Flink Issue Type: Bug Components: Connectors / Kafka, Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table SQL / Ecosystem Affects Versions: 1.13.1 Reporter: Francesco Guardiani Attachments: flink-deser-avro-enum.zip A user reported in the [mailing list|https://lists.apache.org/thread.html/re38a07f6121cc580737a20c11574719cfe554e58d99817f79db9bb4a%40%3Cuser.flink.apache.org%3E] that Avro deserialization fails when using Kafka, Avro and Confluent Schema Registry: {code:java} Caused by: java.io.IOException: Failed to deserialize Avro record. at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) Caused by: org.apache.avro.AvroTypeException: Found my.type.avro.MyEnumType, expecting union at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308) at org.apache.avro.io.parsing.Parser.advance(Parser.java:86) at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103) ... 9 more {code} Look in the attached code for a reproducer -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24507) Cleanup all the various date/time/timestamp utils
Francesco Guardiani created FLINK-24507: --- Summary: Cleanup all the various date/time/timestamp utils Key: FLINK-24507 URL: https://issues.apache.org/jira/browse/FLINK-24507 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani There are different timestamp utils that won't be required anymore once FLINK-21456 is solved. For example: * {org.apache.flink.table.util.TimestampStringUtils} * {org.apache.flink.table.utils.TimestampStringUtils} Double check all the usages of these classes, and if there are any eventual classes like that, and replace them with the new {DateTimeUtils} in common whenever is possible -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24500) Move SqlDateTimeUtils to table-common
Francesco Guardiani created FLINK-24500: --- Summary: Move SqlDateTimeUtils to table-common Key: FLINK-24500 URL: https://issues.apache.org/jira/browse/FLINK-24500 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24499) OVER IGNORE NULLS support
Francesco Guardiani created FLINK-24499: --- Summary: OVER IGNORE NULLS support Key: FLINK-24499 URL: https://issues.apache.org/jira/browse/FLINK-24499 Project: Flink Issue Type: New Feature Reporter: Francesco Guardiani OVER window specification has 2 keywords to specify how to deal with null values: IGNORE NULLS and RETAIN NULLS. They are specified in SQL 2008 paragraph 6.10 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24466) Interval Join late events handling behaviour is not consistent
Francesco Guardiani created FLINK-24466: --- Summary: Interval Join late events handling behaviour is not consistent Key: FLINK-24466 URL: https://issues.apache.org/jira/browse/FLINK-24466 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Reporter: Francesco Guardiani Attachments: Fix_late_events_filtering_for_interval_join.patch Interval Join handles late events emitting them in the output, as a padded row. This behavior is also tested extensively in {{RowTimeIntervalJoinTest}}. The problem with this behavior is the way an event is considered "late" or not: in order to distinguish between the two, {{RowTimeIntervalJoin}} uses the {{ctx.timerService().currentWatermark()}} to find out if an event is later than the last received watermark or not. But that method returns the "combined" watermark across all the keys, partitions and *input streams*, that is if one of the two streams goes "slower" than the other one, the returned watermark is going to be the minimum among the two. This means that our late events handling effectively works only if the two streams run "at the same pace", otherwise we'll just see what we consider _late events_ for one of the two streams as joined. To observe this behavior, just run the test {{IntervalJoinITCase#testRowTimeInnerJoinWithEquiTimeAttrs}} in this revision https://github.com/apache/flink/commit/7033cbfe404bea1519d3342a611e2f92768d70f9 several times and you'll see that after a couple of runs it fails, joining one of the {{"should-be-discarded"}} records. Those records are way behind the watermark - 1 second, as defined. You'll find attached in the issue a small patch to show how this could be fixed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24462) Refactor casting rules in a similar fashion to DataStructureConverter
Francesco Guardiani created FLINK-24462: --- Summary: Refactor casting rules in a similar fashion to DataStructureConverter Key: FLINK-24462 URL: https://issues.apache.org/jira/browse/FLINK-24462 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24461) TableResult#print() should use internal data types
Francesco Guardiani created FLINK-24461: --- Summary: TableResult#print() should use internal data types Key: FLINK-24461 URL: https://issues.apache.org/jira/browse/FLINK-24461 Project: Flink Issue Type: Sub-task Reporter: Francesco Guardiani The collector used by {{TableResult#print()}} should use internal data types -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24363) java.io.FileNotFoundException crashes the standalone session manager
Francesco Guardiani created FLINK-24363: --- Summary: java.io.FileNotFoundException crashes the standalone session manager Key: FLINK-24363 URL: https://issues.apache.org/jira/browse/FLINK-24363 Project: Flink Issue Type: Bug Components: Connectors / FileSystem, Table SQL / Runtime Affects Versions: shaded-14.0 Environment: Flink 1.14-SNAPSHOT Reporter: Francesco Guardiani When trying to execute a query reading from a table configure with the filesystem connector and a bad path, the session manager crashes and sql client query execution times out: {code:java} org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Could not start RpcEndpoint jobmanager_2. at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:629) ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0] at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0] at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0] at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0] at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0] at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0] at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_292] at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_292] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_292] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_292] Caused by: org.apache.flink.runtime.jobmaster.JobMasterException: Could not start the JobMaster. at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:391) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:624) ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0] at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:623) ~[flink-rpc-akka_a0e54fcb-a903-4676-9e88-cac15e82c6ff.jar:1.14.0] ... 20 more Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the operator coordinators at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:90) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:584) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:965) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:882)
[jira] [Created] (FLINK-24359) Migrate FileSystem connector to ResolvedSchema
Francesco Guardiani created FLINK-24359: --- Summary: Migrate FileSystem connector to ResolvedSchema Key: FLINK-24359 URL: https://issues.apache.org/jira/browse/FLINK-24359 Project: Flink Issue Type: New Feature Components: Table SQL / Ecosystem Environment: Flink 1.14-SNAPSHOT Reporter: Francesco Guardiani Filesystem connector uses the TableSchema deprecated APIs. This causes issues with Table APIs, because TableSchema#fromResolvedSchema(ResolvedSchema) requires the expressions to be serializable strings (ResolvedExpression#asSerializableString). For example: {code:java} TableDescriptor inputTable = TableDescriptor.forConnector("filesystem") .schema( Schema.newBuilder() .column("character", DataTypes.STRING()) .column("latitude", DataTypes.STRING()) .column("longitude", DataTypes.STRING()) .column("time", DataTypes.TIMESTAMP(3)) .watermark("time", $("time").minus(lit(2).seconds())) .build() ) // Other options .build(); {code} When used in a table pipeline, throws the following exception: {code:java} Caused by: org.apache.flink.table.api.TableException: Expression 'minus(time, 2000)' is not string serializable. Currently, only expressions that originated from a SQL expression have a well-defined string representation. at org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51) at org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455) at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4976) at org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451) at org.apache.flink.table.catalog.ResolvedCatalogBaseTable.getSchema(ResolvedCatalogBaseTable.java:54) at org.apache.flink.table.filesystem.AbstractFileSystemTable.(AbstractFileSystemTable.java:52) at org.apache.flink.table.filesystem.FileSystemTableSource.(FileSystemTableSource.java:91) at org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:74) at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:145) {code} The same table definition using SQL works fine: {code:java} CREATE TABLE IF NOT EXISTS LocationEvents ( `character` STRING, `latitude` STRING, `longitude` STRING, `time` TIMESTAMP(3), WATERMARK FOR `time` AS `time` - INTERVAL '5' MINUTES ) WITH ( -- Load from filesystem 'connector' = 'filesystem', --- Other configs ); {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24225) Define and implement UPSERT INTO semantics
Francesco Guardiani created FLINK-24225: --- Summary: Define and implement UPSERT INTO semantics Key: FLINK-24225 URL: https://issues.apache.org/jira/browse/FLINK-24225 Project: Flink Issue Type: Bug Reporter: Francesco Guardiani In the https://issues.apache.org/jira/browse/FLINK-22942, UPSERT INTO was disabled. We should define the correct behaviour for UPSERT INTO and implement it -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24214) A submit job failure crashes the sql client
Francesco Guardiani created FLINK-24214: --- Summary: A submit job failure crashes the sql client Key: FLINK-24214 URL: https://issues.apache.org/jira/browse/FLINK-24214 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.13.2 Environment: Flink 1.13.2 Ubuntu 21.04 Java 8 Reporter: Francesco Guardiani I've noticed that when executing a valid query, in case there is a "bad" error when submitting it to the flink cluster, the client is going to crash, with a misleading beginning of the stacktrace. For example: {code:java} Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190) Caused by: java.lang.RuntimeException: Error running SQL job. at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:606) at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527) at org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:537) at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299) at java.util.Optional.ifPresent(Optional.java:159) at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200) at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:603) ... 8 more Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., (JobManagerRunnerImpl.java:152) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379) at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) ... 7 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'Source: KafkaTableSource(o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_currency, o_ordertime, o_orderpriority, o_clerk, o_shippriority, o_comment) -> SourceConversion(table=[default_catalog.default_database.prod_orders, source: [KafkaTableSource(o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_currency, o_ordertime, o_orderpriority, o_clerk, o_shippriority, o_comment)]], fields=[o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_currency, o_ordertime, o_orderpriority, o_clerk, o_shippriority, o_comment]) -> Timestamps/Watermarks -> SinkConversionToRow -> Map -> Sink: CsvTableSink(o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_currency,
[jira] [Created] (FLINK-24170) Building a fresh clone with latest Maven fails
Francesco Guardiani created FLINK-24170: --- Summary: Building a fresh clone with latest Maven fails Key: FLINK-24170 URL: https://issues.apache.org/jira/browse/FLINK-24170 Project: Flink Issue Type: Bug Components: Build System Environment: Maven: 3.8.2 JDK: 8 OS: Ubuntu 21.04 Reporter: Francesco Guardiani Building with the latest Maven fails during the {{highest-dir}} goal: {quote}{{Cannot find a single highest directory for this project set. First two candidates directories don't share a common root. }} {quote} As suggested on this SO answer, the {{directory-of}} goal works better -- This message was sent by Atlassian Jira (v8.3.4#803005)