[GitHub] [flink] luoyuxia commented on a diff in pull request #22977: [FLINK-32569][table] Fix the incomplete serialization of ResolvedCatalogTable caused by the newly introduced time travel interface
luoyuxia commented on code in PR #22977: URL: https://github.com/apache/flink/pull/22977#discussion_r1263367999 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java: ## @@ -79,6 +80,11 @@ public static Map serializeCatalogTable(ResolvedCatalogTable res properties.put(COMMENT, comment); } +final Optional snapshot = resolvedTable.getSnapshot(); +if (snapshot.isPresent()) { Review Comment: ```suggestion snapshot.ifPresent(snapshotId -> properties.put(SNAPSHOT, Long.toString(snapshotId))); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-32020) Enable Dynamic Partition Discovery by Default in Kafka Source based on FLIP-288
[ https://issues.apache.org/jira/browse/FLINK-32020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren resolved FLINK-32020. --- Resolution: Fixed > Enable Dynamic Partition Discovery by Default in Kafka Source based on > FLIP-288 > --- > > Key: FLINK-32020 > URL: https://issues.apache.org/jira/browse/FLINK-32020 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Affects Versions: kafka-3.0.0 >Reporter: Hongshun Wang >Assignee: Hongshun Wang >Priority: Major > Labels: pull-request-available > Fix For: kafka-4.0.0 > > > As described in > [FLIP-288|https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source], > dynamic partition discovery is disabled by default, and users have to > specify the interval of discovery in order to turn it on. > This subtask is to enable Dynamic Partition Discovery by Default in Kafka > Source. > Partition discovery is performed on the KafkaSourceEnumerator, which > asynchronously fetches topic metadata from the Kafka cluster and checks if > there are any new topics and partitions. This should not cause performance > issues on the Flink side. > On the Kafka broker side, partition discovery sends a MetadataRequest to the > Kafka broker to fetch topic information. Considering that the Kafka broker > has its metadata cache and the default request frequency is relatively low > (once every 30 seconds), this is not a heavy operation, and the broker's > performance will not be significantly affected. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-31953) FLIP-288: Enable Dynamic Partition Discovery by Default in Kafka Source
[ https://issues.apache.org/jira/browse/FLINK-31953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren resolved FLINK-31953. --- Resolution: Fixed > FLIP-288: Enable Dynamic Partition Discovery by Default in Kafka Source > --- > > Key: FLINK-31953 > URL: https://issues.apache.org/jira/browse/FLINK-31953 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Hongshun Wang >Assignee: Hongshun Wang >Priority: Major > Labels: Connector, kafka-source, pull-request-available > Fix For: kafka-4.0.0 > > > This improvement implements > [FLIP-288|https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source]. > this Flip has there main objectives: > # Enable partition discovery by default. > # Provide a *EARLIEST* strategy for later discovered partitions. > # Organize the code logic of the current built-in OffsetsInitializer, then > modify the JavaDoc to let users know. > Each objective corresponds to a sub-task > h4. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32020) Enable Dynamic Partition Discovery by Default in Kafka Source based on FLIP-288
[ https://issues.apache.org/jira/browse/FLINK-32020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17743029#comment-17743029 ] Qingsheng Ren commented on FLINK-32020: --- Merged to main: 811716c5155e82fa3bfc47ced53daef53bb99cce > Enable Dynamic Partition Discovery by Default in Kafka Source based on > FLIP-288 > --- > > Key: FLINK-32020 > URL: https://issues.apache.org/jira/browse/FLINK-32020 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Affects Versions: kafka-3.0.0 >Reporter: Hongshun Wang >Assignee: Hongshun Wang >Priority: Major > Labels: pull-request-available > Fix For: kafka-4.0.0 > > > As described in > [FLIP-288|https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source], > dynamic partition discovery is disabled by default, and users have to > specify the interval of discovery in order to turn it on. > This subtask is to enable Dynamic Partition Discovery by Default in Kafka > Source. > Partition discovery is performed on the KafkaSourceEnumerator, which > asynchronously fetches topic metadata from the Kafka cluster and checks if > there are any new topics and partitions. This should not cause performance > issues on the Flink side. > On the Kafka broker side, partition discovery sends a MetadataRequest to the > Kafka broker to fetch topic information. Considering that the Kafka broker > has its metadata cache and the default request frequency is relatively low > (once every 30 seconds), this is not a heavy operation, and the broker's > performance will not be significantly affected. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-kafka] PatrickRen closed pull request #40: [FLINK-32020] Enable Dynamic Partition Discovery by Default in Kafka Source based on FLIP-288
PatrickRen closed pull request #40: [FLINK-32020] Enable Dynamic Partition Discovery by Default in Kafka Source based on FLIP-288 URL: https://github.com/apache/flink-connector-kafka/pull/40 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] jiaoqingbo commented on pull request #22992: [hotfix] Fix typo in JobManagerOptions
jiaoqingbo commented on PR #22992: URL: https://github.com/apache/flink/pull/22992#issuecomment-1635348356 My modification does not seem to be related to the CI error ,please cc @wanglijie95 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32335) Fix the Flink ML unittest failure
[ https://issues.apache.org/jira/browse/FLINK-32335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17743010#comment-17743010 ] Dong Lin commented on FLINK-32335: -- Merged to apache/flink-ml master branch 892fb4714c6f7e7055f26b4a2d1e1b36094500be > Fix the Flink ML unittest failure > - > > Key: FLINK-32335 > URL: https://issues.apache.org/jira/browse/FLINK-32335 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Reporter: Jiang Xin >Assignee: Jiang Xin >Priority: Major > Labels: pull-request-available > Fix For: ml-2.4.0 > > > The [github > CI](https://github.com/apache/flink-ml/actions/runs/5227269169/jobs/9438737620) > of Flink ML failed because of the following exception. > > {code:java} > E Caused by: java.util.ConcurrentModificationException > 223E at > java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:648) > 224E at > java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1044) > 225E at > org.apache.flink.iteration.operator.HeadOperator.parseInputChannelEvents(HeadOperator.java:464) > 226E at > org.apache.flink.iteration.operator.HeadOperator.endInput(HeadOperator.java:392) > 227E at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96) > 228E at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97) > 229E at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68) > 230E at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > 231E at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > 232E at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > 233E at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > 234E at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > 235E at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > 236E at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > 237E at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > 238E at java.lang.Thread.run(Thread.java:750){code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-32335) Fix the Flink ML unittest failure
[ https://issues.apache.org/jira/browse/FLINK-32335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin resolved FLINK-32335. -- Resolution: Fixed > Fix the Flink ML unittest failure > - > > Key: FLINK-32335 > URL: https://issues.apache.org/jira/browse/FLINK-32335 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Reporter: Jiang Xin >Assignee: Jiang Xin >Priority: Major > Labels: pull-request-available > Fix For: ml-2.4.0 > > > The [github > CI](https://github.com/apache/flink-ml/actions/runs/5227269169/jobs/9438737620) > of Flink ML failed because of the following exception. > > {code:java} > E Caused by: java.util.ConcurrentModificationException > 223E at > java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:648) > 224E at > java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1044) > 225E at > org.apache.flink.iteration.operator.HeadOperator.parseInputChannelEvents(HeadOperator.java:464) > 226E at > org.apache.flink.iteration.operator.HeadOperator.endInput(HeadOperator.java:392) > 227E at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96) > 228E at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97) > 229E at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68) > 230E at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > 231E at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > 232E at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > 233E at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > 234E at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > 235E at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > 236E at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > 237E at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > 238E at java.lang.Thread.run(Thread.java:750){code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect
[ https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17743009#comment-17743009 ] Yunhong Zheng commented on FLINK-32579: --- Hi, [~jasonliangyc] . I got it. For question1: there is no filter condition "p.name = ''" in relNode LookupJoin? * This is normal because the filter condition has been pushed down to the jdbc source (jdbc source supports filter pushdown). The pushed down condition will not be displayed in the LookupJoin node. For question2: wrong join result? * I think this is a bug for jdbc lookup source. For the pushed filter condition, the jdbc lookup source did not consume this filter correctly. After reading the code, I speculate that this is because the jdbc source doesn't process this filter condition for dim table. * To quickly verify this error. you can disable filter push down by adding config 'table.optimizer.source.predicate-pushdown-enabled'. * Also, after verifying, if this error is caused by jdbc source, you can @ [~ruanhang1993] taking a look. > The filter criteria on the lookup table of Lookup join has no effect > - > > Key: FLINK-32579 > URL: https://issues.apache.org/jira/browse/FLINK-32579 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0, 1.17.1 >Reporter: jasonliangyc >Priority: Major > Attachments: image-2023-07-12-09-31-18-261.png, > image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, > image-2023-07-13-17-19-26-972.png, image-2023-07-13-22-35-35-696.png, > image-2023-07-13-22-38-16-709.png, image-2023-07-13-22-43-24-213.png, > image-2023-07-13-22-43-45-957.png, test_case.sql > > > *1.* I joined two tables using the lookup join as below query in sql-client, > the filter criteria of (p.name = '??') didn't shows up in the execution > detail and it returned the rows only base on one condiction (cdc.product_id = > p.id) > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and > cdc.product_id = p.id > ; {code} > !image-2023-07-12-09-31-18-261.png|width=657,height=132! > > *2.* It showed the werid results when i changed the query as below, cause > there were no data in the table(products) that the value of column 'name' is > '??' and and execution detail didn't show us the where criteria. > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id > where p.name = '??' > ; {code} > !image-2023-07-12-09-42-59-231.png|width=684,height=102! > !image-2023-07-12-09-47-31-397.png|width=685,height=120! > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-32335) Fix the Flink ML unittest failure
[ https://issues.apache.org/jira/browse/FLINK-32335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin reassigned FLINK-32335: Assignee: Jiang Xin > Fix the Flink ML unittest failure > - > > Key: FLINK-32335 > URL: https://issues.apache.org/jira/browse/FLINK-32335 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Reporter: Jiang Xin >Assignee: Jiang Xin >Priority: Major > Labels: pull-request-available > Fix For: ml-2.4.0 > > > The [github > CI](https://github.com/apache/flink-ml/actions/runs/5227269169/jobs/9438737620) > of Flink ML failed because of the following exception. > > {code:java} > E Caused by: java.util.ConcurrentModificationException > 223E at > java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:648) > 224E at > java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1044) > 225E at > org.apache.flink.iteration.operator.HeadOperator.parseInputChannelEvents(HeadOperator.java:464) > 226E at > org.apache.flink.iteration.operator.HeadOperator.endInput(HeadOperator.java:392) > 227E at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96) > 228E at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97) > 229E at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68) > 230E at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > 231E at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > 232E at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > 233E at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > 234E at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > 235E at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > 236E at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > 237E at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > 238E at java.lang.Thread.run(Thread.java:750){code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-ml] lindong28 merged pull request #247: [FLINK-32335] Fix the ConcurrentModificationException in the unittest
lindong28 merged PR #247: URL: https://github.com/apache/flink-ml/pull/247 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 commented on pull request #247: [FLINK-32335] Fix the ConcurrentModificationException in the unittest
lindong28 commented on PR #247: URL: https://github.com/apache/flink-ml/pull/247#issuecomment-1635252080 Thanks for the PR. LGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-32590) Fail to read flink parquet filesystem table stored in hive metastore service.
Guozhen Yang created FLINK-32590: Summary: Fail to read flink parquet filesystem table stored in hive metastore service. Key: FLINK-32590 URL: https://issues.apache.org/jira/browse/FLINK-32590 Project: Flink Issue Type: Bug Components: Connectors / Hive, Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.17.1 Reporter: Guozhen Yang h2. Summary: Fail to read flink parquet filesystem table stored in hive metastore service. h2. The problem: When I try to read a flink parquet filesystem table stored in hive metastore service, I got the following exception. {noformat} java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261) ~[flink-connector-files-1.17.1.jar:1.17.1] at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) ~[flink-connector-files-1.17.1.jar:1.17.1] at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131) ~[flink-connector-files-1.17.1.jar:1.17.1] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.17.1.jar:1.17.1] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_345] Caused by: java.lang.NoSuchMethodError: shaded.parquet.org.apache.thrift.TBaseHelper.hashCode(J)I at org.apache.parquet.format.ColumnChunk.hashCode(ColumnChunk.java:812) ~[flink-sql-parquet-1.17.1.jar:1.17.1] at java.util.AbstractList.hashCode(AbstractList.java:541) ~[?:1.8.0_345] at org.apache.parquet.format.RowGroup.hashCode(RowGroup.java:704) ~[flink-sql-parquet-1.17.1.jar:1.17.1] at java.util.HashMap.hash(HashMap.java:340) ~[?:1.8.0_345] at java.util.HashMap.put(HashMap.java:613) ~[?:1.8.0_345] at org.apache.parquet.format.converter.ParquetMetadataConverter.generateRowGroupOffsets(ParquetMetadataConverter.java:1411) ~[flink-sql-parquet-1.17.1.jar:1.17.1] at org.apache.parquet.format.converter.ParquetMetadataConverter.access$600(ParquetMetadataConverter.java:144) ~[flink-sql-parquet-1.17.1.jar:1.17.1] at org.apache.parquet.format.converter.ParquetMetadataConverter$3.visit(ParquetMetadataConverter.java:1461) ~[flink-sql-parquet-1.17.1.jar:1.17.1] at org.apache.parquet.format.converter.ParquetMetadataConverter$3.visit(ParquetMetadataConverter.java:1437) ~[flink-sql-parquet-1.17.1.jar:1.17.1] at org.apache.parquet.format.converter.ParquetMetadataConverter$RangeMetadataFilter.accept(ParquetMetadataConverter.java:1207) ~[flink-sql-parquet-1.17.1.jar:1.17.1] at org.apache.parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:1437) ~[flink-sql-parquet-1.17.1.jar:1.17.1] at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:583) ~[flink-sql-parquet-1.17.1.jar:1.17.1] at org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:777) ~[flink-sql-parquet-1.17.1.jar:1.17.1] at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:658) ~[flink-sql-parquet-1.17.1.jar:1.17.1] at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:127) ~[flink-sql-parquet-1.17.1.jar:1.17.1] at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat
[GitHub] [flink] flinkbot commented on pull request #22994: [FLINK-32354][table] Supports executing call procedure statement
flinkbot commented on PR #22994: URL: https://github.com/apache/flink/pull/22994#issuecomment-1635219756 ## CI report: * c7abe7f02a916120208e6f9b80513bf6b6385b9a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32354) Support to execute the call procedure operation
[ https://issues.apache.org/jira/browse/FLINK-32354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32354: --- Labels: pull-request-available (was: ) > Support to execute the call procedure operation > --- > > Key: FLINK-32354 > URL: https://issues.apache.org/jira/browse/FLINK-32354 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Reporter: luoyuxia >Assignee: luoyuxia >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] luoyuxia opened a new pull request, #22994: [FLINK-32354][table] Supports executing call procedure statement
luoyuxia opened a new pull request, #22994: URL: https://github.com/apache/flink/pull/22994 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation
Samrat002 commented on code in PR #47: URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1263236647 ## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java: ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.connector.aws.config.AWSConfigConstants; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.glue.constants.AWSGlueConfigConstants; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; + +import java.util.HashSet; +import java.util.Set; + +/** Collection of {@link ConfigOption} used in GlueCatalog. */ +@Internal +public class GlueCatalogOptions extends CommonCatalogOptions { + +public static final String IDENTIFIER = "glue"; +public static final ConfigOption DEFAULT_DATABASE = +ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY) +.stringType() +.defaultValue(GenericInMemoryCatalog.DEFAULT_DB); + +public static final ConfigOption INPUT_FORMAT = +ConfigOptions.key(GlueCatalogConstants.TABLE_INPUT_FORMAT) +.stringType() +.noDefaultValue(); + +public static final ConfigOption OUTPUT_FORMAT = +ConfigOptions.key(GlueCatalogConstants.TABLE_OUTPUT_FORMAT) +.stringType() +.noDefaultValue(); + +public static final ConfigOption GLUE_CATALOG_ENDPOINT = +ConfigOptions.key(AWSGlueConfigConstants.GLUE_CATALOG_ENDPOINT) +.stringType() +.noDefaultValue(); Review Comment: explained here [link](https://github.com/apache/flink-connector-aws/pull/47#discussion_r1263235270) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation
Samrat002 commented on code in PR #47: URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1263235270 ## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java: ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.connector.aws.config.AWSConfigConstants; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.glue.constants.AWSGlueConfigConstants; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; + +import java.util.HashSet; +import java.util.Set; + +/** Collection of {@link ConfigOption} used in GlueCatalog. */ +@Internal +public class GlueCatalogOptions extends CommonCatalogOptions { + +public static final String IDENTIFIER = "glue"; +public static final ConfigOption DEFAULT_DATABASE = +ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY) +.stringType() +.defaultValue(GenericInMemoryCatalog.DEFAULT_DB); + +public static final ConfigOption INPUT_FORMAT = +ConfigOptions.key(GlueCatalogConstants.TABLE_INPUT_FORMAT) +.stringType() +.noDefaultValue(); + +public static final ConfigOption OUTPUT_FORMAT = +ConfigOptions.key(GlueCatalogConstants.TABLE_OUTPUT_FORMAT) +.stringType() +.noDefaultValue(); + +public static final ConfigOption GLUE_CATALOG_ENDPOINT = +ConfigOptions.key(AWSGlueConfigConstants.GLUE_CATALOG_ENDPOINT) +.stringType() +.noDefaultValue(); + +public static final ConfigOption GLUE_CATALOG_ID = + ConfigOptions.key(AWSGlueConfigConstants.GLUE_CATALOG_ID).stringType().noDefaultValue(); + +public static final ConfigOption GLUE_ACCOUNT_ID = + ConfigOptions.key(AWSGlueConfigConstants.GLUE_ACCOUNT_ID).stringType().noDefaultValue(); Review Comment: In `GlueCatalogOptions` class `GLUE_ACCOUNT_ID` is optional config which user can pass to connect to different glue account in combination with `GLUE_CATALOG_ENDPOINT` . One of the use case where user can create 2 catalog in same session. one catalog can points to glue account (may be in different aws account) and another to different glue account. Is its used in method ``` public static Set> getAllConfigOptions() { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation
Samrat002 commented on code in PR #47: URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1263235270 ## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java: ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.connector.aws.config.AWSConfigConstants; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.glue.constants.AWSGlueConfigConstants; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; + +import java.util.HashSet; +import java.util.Set; + +/** Collection of {@link ConfigOption} used in GlueCatalog. */ +@Internal +public class GlueCatalogOptions extends CommonCatalogOptions { + +public static final String IDENTIFIER = "glue"; +public static final ConfigOption DEFAULT_DATABASE = +ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY) +.stringType() +.defaultValue(GenericInMemoryCatalog.DEFAULT_DB); + +public static final ConfigOption INPUT_FORMAT = +ConfigOptions.key(GlueCatalogConstants.TABLE_INPUT_FORMAT) +.stringType() +.noDefaultValue(); + +public static final ConfigOption OUTPUT_FORMAT = +ConfigOptions.key(GlueCatalogConstants.TABLE_OUTPUT_FORMAT) +.stringType() +.noDefaultValue(); + +public static final ConfigOption GLUE_CATALOG_ENDPOINT = +ConfigOptions.key(AWSGlueConfigConstants.GLUE_CATALOG_ENDPOINT) +.stringType() +.noDefaultValue(); + +public static final ConfigOption GLUE_CATALOG_ID = + ConfigOptions.key(AWSGlueConfigConstants.GLUE_CATALOG_ID).stringType().noDefaultValue(); + +public static final ConfigOption GLUE_ACCOUNT_ID = + ConfigOptions.key(AWSGlueConfigConstants.GLUE_ACCOUNT_ID).stringType().noDefaultValue(); Review Comment: In `GlueCatalogOptions` class `GLUE_ACCOUNT_ID` is optional config which user can pass to connect to different glue account. One of the use case where user can create 2 catalog in same session. one catalog can points to glue account (may be in different aws account) and another to different glue account. Is its used in method ``` public static Set> getAllConfigOptions() { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation
Samrat002 commented on code in PR #47: URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1263235270 ## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java: ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.connector.aws.config.AWSConfigConstants; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.glue.constants.AWSGlueConfigConstants; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; + +import java.util.HashSet; +import java.util.Set; + +/** Collection of {@link ConfigOption} used in GlueCatalog. */ +@Internal +public class GlueCatalogOptions extends CommonCatalogOptions { + +public static final String IDENTIFIER = "glue"; +public static final ConfigOption DEFAULT_DATABASE = +ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY) +.stringType() +.defaultValue(GenericInMemoryCatalog.DEFAULT_DB); + +public static final ConfigOption INPUT_FORMAT = +ConfigOptions.key(GlueCatalogConstants.TABLE_INPUT_FORMAT) +.stringType() +.noDefaultValue(); + +public static final ConfigOption OUTPUT_FORMAT = +ConfigOptions.key(GlueCatalogConstants.TABLE_OUTPUT_FORMAT) +.stringType() +.noDefaultValue(); + +public static final ConfigOption GLUE_CATALOG_ENDPOINT = +ConfigOptions.key(AWSGlueConfigConstants.GLUE_CATALOG_ENDPOINT) +.stringType() +.noDefaultValue(); + +public static final ConfigOption GLUE_CATALOG_ID = + ConfigOptions.key(AWSGlueConfigConstants.GLUE_CATALOG_ID).stringType().noDefaultValue(); + +public static final ConfigOption GLUE_ACCOUNT_ID = + ConfigOptions.key(AWSGlueConfigConstants.GLUE_ACCOUNT_ID).stringType().noDefaultValue(); Review Comment: In `GlueCatalogOptions` class `GLUE_ACCOUNT_ID` is optional config which user can pass to connect to different glue account. One of the use case where user can create 2 catalog in same session. one catalog can points to glue account (may be in different aws account) and another to different glue account. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation
Samrat002 commented on code in PR #47: URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1263233465 ## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/constants/AWSGlueConfigConstants.java: ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.constants; + +import org.apache.flink.annotation.PublicEvolving; + +/** Configuration keys for AWS Glue Data Catalog service usage. */ +@PublicEvolving +public class AWSGlueConfigConstants { + +/** + * Configure an alternative endpoint of the Glue service for GlueCatalog to access. + * + * This could be used to use GlueCatalog with any glue-compatible metastore service that has + * a different endpoint + */ +public static final String GLUE_CATALOG_ENDPOINT = "aws.glue.endpoint"; Review Comment: this constant is used in defining config options for the catalog ``` public static final ConfigOption GLUE_CATALOG_ENDPOINT = ConfigOptions.key(AWSGlueConfigConstants.GLUE_CATALOG_ENDPOINT) .stringType() .noDefaultValue(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation
Samrat002 commented on code in PR #47: URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1263232863 ## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/constants/AWSGlueConfigConstants.java: ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.constants; + +import org.apache.flink.annotation.PublicEvolving; + +/** Configuration keys for AWS Glue Data Catalog service usage. */ +@PublicEvolving +public class AWSGlueConfigConstants { + +/** + * Configure an alternative endpoint of the Glue service for GlueCatalog to access. + * + * This could be used to use GlueCatalog with any glue-compatible metastore service that has + * a different endpoint + */ +public static final String GLUE_CATALOG_ENDPOINT = "aws.glue.endpoint"; + +/** + * The ID of the Glue Data Catalog where the tables reside. If none is provided, Glue + * automatically uses the caller's AWS account ID by default. + * + * For more details, see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html";>... + */ +public static final String GLUE_CATALOG_ID = "aws.glue.id"; + +/** + * The account ID used in a Glue resource ARN, e.g. + * arn:aws:glue:us-east-1:1:table/db1/table1 + */ +public static final String GLUE_ACCOUNT_ID = "aws.glue.account-id"; Review Comment: ``` public static final ConfigOption GLUE_ACCOUNT_ID = ConfigOptions.key(AWSGlueConfigConstants.GLUE_ACCOUNT_ID).stringType().noDefaultValue(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation
Samrat002 commented on code in PR #47: URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1263232863 ## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/constants/AWSGlueConfigConstants.java: ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.constants; + +import org.apache.flink.annotation.PublicEvolving; + +/** Configuration keys for AWS Glue Data Catalog service usage. */ +@PublicEvolving +public class AWSGlueConfigConstants { + +/** + * Configure an alternative endpoint of the Glue service for GlueCatalog to access. + * + * This could be used to use GlueCatalog with any glue-compatible metastore service that has + * a different endpoint + */ +public static final String GLUE_CATALOG_ENDPOINT = "aws.glue.endpoint"; + +/** + * The ID of the Glue Data Catalog where the tables reside. If none is provided, Glue + * automatically uses the caller's AWS account ID by default. + * + * For more details, see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html";>... + */ +public static final String GLUE_CATALOG_ID = "aws.glue.id"; + +/** + * The account ID used in a Glue resource ARN, e.g. + * arn:aws:glue:us-east-1:1:table/db1/table1 + */ +public static final String GLUE_ACCOUNT_ID = "aws.glue.account-id"; Review Comment: It is used in ``` public static final ConfigOption GLUE_ACCOUNT_ID = ConfigOptions.key(AWSGlueConfigConstants.GLUE_ACCOUNT_ID).stringType().noDefaultValue(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-28743) Support validating the determinism for StreamPhysicalMatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-28743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee resolved FLINK-28743. - Resolution: Fixed fixed in master: 8829b5e60a71b871462d1d2bb849c926b0de9b80 > Support validating the determinism for StreamPhysicalMatchRecognize > --- > > Key: FLINK-28743 > URL: https://issues.apache.org/jira/browse/FLINK-28743 > Project: Flink > Issue Type: Sub-task >Reporter: lincoln lee >Assignee: lincoln lee >Priority: Minor > Labels: pull-request-available > Fix For: 1.18.0 > > > MatchRecognize has complex expressions and is not commonly used in > traditional SQLs, so mark this as a minor issue (for 1.16) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] lincoln-lil merged pull request #22981: [FLINK-28743][table-planner] Supports validating the determinism for StreamPhysicalMatchRecognize
lincoln-lil merged PR #22981: URL: https://github.com/apache/flink/pull/22981 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] jiaoqingbo commented on pull request #22992: [hotfix] Fix typo in JobManagerOptions
jiaoqingbo commented on PR #22992: URL: https://github.com/apache/flink/pull/22992#issuecomment-1635163141 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-32481) Support type inference for procedure
[ https://issues.apache.org/jira/browse/FLINK-32481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia resolved FLINK-32481. -- Resolution: Fixed master: e5324c085f627df2f8e452b0aec3264fe0c6f6f6 > Support type inference for procedure > > > Key: FLINK-32481 > URL: https://issues.apache.org/jira/browse/FLINK-32481 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API, Table SQL / Runtime >Reporter: luoyuxia >Assignee: luoyuxia >Priority: Major > Labels: pull-request-available > > Currently, FunctionMappingExtractor can only handle the type inference for > procedure. We can extend it to make it can also handle procedure. Since > procedure is much similar to function, we can resue the stack/code of > {{{}FunctionMappingExtractor{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] luoyuxia merged pull request #22904: [FLINK-32481][table] Support type inference for procedure
luoyuxia merged PR #22904: URL: https://github.com/apache/flink/pull/22904 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on pull request #22904: [FLINK-32481][table] Support type inference for procedure
luoyuxia commented on PR #22904: URL: https://github.com/apache/flink/pull/22904#issuecomment-1635130068 @LadyForest Thanks for reviewing. Merging... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-32587) The results returned from the CDC sql query are null or the value was changed unexpectly
[ https://issues.apache.org/jira/browse/FLINK-32587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jasonliangyc closed FLINK-32587. Resolution: Cannot Reproduce > The results returned from the CDC sql query are null or the value was changed > unexpectly > > > Key: FLINK-32587 > URL: https://issues.apache.org/jira/browse/FLINK-32587 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0, 1.17.1 >Reporter: jasonliangyc >Priority: Critical > Attachments: image-2023-07-13-17-35-32-235.png, > image-2023-07-13-17-37-56-908.png > > > I created a CDC table as below and then run the query 'select * from so_cdc' > through sql-client, it gives me the unexpected results. > {code:java} > CREATE TABLE so_cdc ( > REC_ID STRING, > Create_Date TIMESTAMP(3), > PRIMARY KEY (REC_ID) NOT ENFORCED > ) WITH ( > 'connector' = 'sqlserver-cdc', > 'hostname' = '', > 'port' = '', > 'username' = 'xxx', > 'password' = '', > 'database-name' = '', > 'schema-name' = '', > 'table-name' = 'xxx', > 'scan.startup.mode' = 'latest-offset' > ); {code} > Run the query for the first time, the data look normal. > !image-2023-07-13-17-35-32-235.png|width=535,height=141! > > But after i run the same query multiple times, it gives me the unexpected > data, and i'm sure that these two columns of my cdc source table don't > contain these data > !image-2023-07-13-17-37-56-908.png|width=469,height=175! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31830) Coalesce on nested fields with different nullabilities will get wrong plan
[ https://issues.apache.org/jira/browse/FLINK-31830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742968#comment-17742968 ] Sergey Nuyanzin commented on FLINK-31830: - Thanks for the very detailed investigation The main issue could be here is connector's support which are testing against master and 2 previous major releases... Even if such changes are applied to master connectors using this types can not benefit from it for a long period of time since they still to support old releases. I wonder if we can keep current method/constructor signature and e.g. tweak {{org.apache.flink.table.api.DataTypes#ROW(org.apache.flink.table.api.DataTypes.Field...)}} in a way that if at least any fields is not null then set initial nullability of the record to {{false}}... Would it work in such a way? > Coalesce on nested fields with different nullabilities will get wrong plan > -- > > Key: FLINK-31830 > URL: https://issues.apache.org/jira/browse/FLINK-31830 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.14.6 >Reporter: lincoln lee >Assignee: Jane Chan >Priority: Major > Attachments: image-2023-06-09-15-06-01-322.png, > image-2023-06-09-15-21-13-720.png > > > A test case similar to FLINK-31829, only changes the nullable field `a.np` to > not null, will get a wrong plan in 1.14.x (reported from the community user): > {code} > @Test > def testCoalesceOnNestedColumns(): Unit = { > val tEnv = util.tableEnv > val tableDescriptor = TableDescriptor.forConnector("datagen") > .schema(Schema.newBuilder > .column("id", DataTypes.INT.notNull) > .column("a", DataTypes.ROW(DataTypes.FIELD("np", > DataTypes.INT.notNull())).nullable) > .build) > .build > tEnv.createTemporaryTable("t1", tableDescriptor) > tEnv.createTemporaryTable("t2", tableDescriptor) > val res = tEnv.executeSql("EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) > c1, IFNULL(a.a.np, b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a > is null or a.a.np is null") > res.print() > } > == Abstract Syntax Tree == > LogicalProject(id=[$0], c1=[CAST($1.np):INTEGER], c2=[IFNULL($1.np, $3.np)]) > +- LogicalFilter(condition=[OR(IS NULL($1), IS NULL(CAST($1.np):INTEGER))]) >+- LogicalJoin(condition=[=($0, $2)], joinType=[left]) > :- LogicalTableScan(table=[[default_catalog, default_database, t1]]) > +- LogicalTableScan(table=[[default_catalog, default_database, t2]]) > {code} > the top project in the ast is wrong: `LogicalProject(id=[$0], > c1=[CAST($1.np):INTEGER], c2=[IFNULL($1.np, $3.np)])`, the > `c1=[CAST($1.np):INTEGER]` relate to `COALESCE(a.a.np, b.a.np) c1` is > incorrect, > but this works fine when using sql ddl to create tables > {code} > @Test > def testCoalesceOnNestedColumns2(): Unit = { > val tEnv = util.tableEnv > tEnv.executeSql( > s""" > |create temporary table t1 ( > | id int not null, > | a row > |) with ( > | 'connector' = 'datagen' > |) > |""".stripMargin) > tEnv.executeSql( > s""" > |create temporary table t2 ( > | id int not null, > | a row > |) with ( > | 'connector' = 'datagen' > |) > |""".stripMargin) > val res = tEnv.executeSql( > "EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) c1, IFNULL(a.a.np, > b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a is null or a.a.np > is null") > res.print() > } > {code} > from 1.15, the coalesce will be a new builtin function, and the ast looks > correct in version 1.15+, while before 1.15 it was rewritten as `case when` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-32585) Filter javax.xml.bind:jaxb-api false positive for Pulsar connector
[ https://issues.apache.org/jira/browse/FLINK-32585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zili Chen closed FLINK-32585. - Resolution: Fixed master via d7a3b3847dc5c680b32d1997d448b7dac44e529c > Filter javax.xml.bind:jaxb-api false positive for Pulsar connector > -- > > Key: FLINK-32585 > URL: https://issues.apache.org/jira/browse/FLINK-32585 > Project: Flink > Issue Type: Improvement > Components: Build System / CI >Reporter: Zili Chen >Assignee: Zili Chen >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] tisonkun merged pull request #22989: [FLINK-32585] Filter javax.xml.bind:jaxb-api false positive for Pulsar connector
tisonkun merged PR #22989: URL: https://github.com/apache/flink/pull/22989 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] tisonkun commented on pull request #22989: [FLINK-32585] Filter javax.xml.bind:jaxb-api false positive for Pulsar connector
tisonkun commented on PR #22989: URL: https://github.com/apache/flink/pull/22989#issuecomment-1634654818 Emm..I think I should merge it now so that the SNAPSHOT can be updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] tisonkun commented on pull request #22989: [FLINK-32585] Filter javax.xml.bind:jaxb-api false positive for Pulsar connector
tisonkun commented on PR #22989: URL: https://github.com/apache/flink/pull/22989#issuecomment-1634653552 I'm going to merge this patch tomorrow and check if the Pulsar connector can pass with this patch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22993: Bump okio from 1.17.2 to 3.4.0 in /flink-end-to-end-tests/flink-end-to-end-tests-sql
flinkbot commented on PR #22993: URL: https://github.com/apache/flink/pull/22993#issuecomment-1634603561 ## CI report: * c1ebb259b21f391fb12eae4d1dd2647e1dd90350 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dependabot[bot] opened a new pull request, #22993: Bump okio from 1.17.2 to 3.4.0 in /flink-end-to-end-tests/flink-end-to-end-tests-sql
dependabot[bot] opened a new pull request, #22993: URL: https://github.com/apache/flink/pull/22993 Bumps [okio](https://github.com/square/okio) from 1.17.2 to 3.4.0. Changelog Sourced from https://github.com/square/okio/blob/master/CHANGELOG.md";>okio's changelog. Version 3.4.0 2023-07-07 New: Adapt a Java NIO FileSystem (java.nio.file.FileSystem) as an Okio FileSystem using fileSystem.asOkioFileSystem(). New: Adapt Android’s AssetManager as an Okio FileSystem using AssetFileSystem. This is in the new okio-assetfilesystem module. Android applications should prefer this over FileSystem.RESOURCES as it’s faster to load. Fix: Don't crash decoding GZIP files when the optional extra data (XLEN) is 32 KiB or larger. Fix: Resolve symlinks in FakeFileSystem.canonicalize(). Fix: Report the correct createdAtMillis in NodeJsFileSystem file metadata. We were incorrectly using ctimeMs, where c means changed, not created. Fix: UnsafeCursor is now Closeable. Version 3.3.0 2023-01-07 Fix: Don't leak resources when use {} is used with a non-local return. We introduced this performance and stability bug by not considering that non-local returns execute neither the return nor catch control flows. Fix: Use a sealed interface for BufferedSink and BufferedSource. These were never intended for end-users to implement, and we're happy that Kotlin now allows us to express that in our API. New: Change internal locks from synchronized to ReentrantLock and Condition. We expect this to improve help when using Okio with Java virtual threads ([Project Loom][loom]). Upgrade: [Kotlin 1.8.0][kotlin_1_8_0]. Version 3.2.0 2022-06-26 Fix: Configure the multiplatform artifact (com.squareup.okio:okio:3.x.x) to depend on the JVM artifact (com.squareup.okio:okio-jvm:3.x.x) for Maven builds. This should work-around an issue where Maven doesn't interpret Gradle metadata. Fix: Change CipherSource and CipherSink to recover if the cipher doesn't support streaming. This should work around a crash with AES/GCM ciphers on Android. New: Enable compatibility with non-hierarchical projects. Version 3.1.0 2022-04-19 Upgrade: [Kotlin 1.6.20][kotlin_1_6_20]. New: Support [Hierarchical project structure][hierarchical_projects]. If you're using Okio in a multiplatform project please upgrade your project to Kotlin 1.6.20 (or newer) to take advantage of this. With hierarchical projects it's easier to use properties like FileSystem.SYSTEM that ... (truncated) Commits https://github.com/square/okio/commit/a161b07fb1b459371458ae6d9508ec31df280428";>a161b07 Prepare for release 3.4.0. https://github.com/square/okio/commit/c5f462b0b51979f0b23b08bff123011bb01045ea";>c5f462b Copyright to files in build-support (https://redirect.github.com/square/okio/issues/1285";>#1285) https://github.com/square/okio/commit/f21714d492f054ae689b455284816721498775eb";>f21714d Upgrade Gradle and JMH (https://redirect.github.com/square/okio/issues/1283";>#1283) https://github.com/square/okio/commit/5f5db4a0d2b1a3a0147c6bc18aeaba5a4ffa4037";>5f5db4a Merge pull request https://redirect.github.com/square/okio/issues/1284";>#1284 from square/renovate/com.google.jimfs https://github.com/square/okio/commit/8af8d2a87b0c71ced5d16c44daef20ab0c5d48c8";>8af8d2a Update dependency com.google.jimfs:jimfs to v1.3.0 https://github.com/square/okio/commit/b64c198b790804eea26a05f5409bffb1a4a2d8eb";>b64c198 Update dependency com.vanniktech:gradle-maven-publish-plugin to v0.25.3 (https://redirect.github.com/square/okio/issues/1282";>#1282) https://github.com/square/okio/commit/ea827139afef064ddd0078607719d32d7c154c0f";>ea82713 Merge pull request https://redirect.github.com/square/okio/issues/1281";>#1281 from square/renovate/gradle-7.x https://github.com/square/okio/commit/3569daa8b8d039a8989440abcc970b7f35171d49";>3569daa Update dependency gradle to v7.6.2 https://github.com/square/okio/commit/e937a50ffc482f9777b639d3399ba331b167107a";>e937a50 Merge pull request https://redirect.github.com/square/okio/issues/1277";>#1277 from sifmelcara/fix-int-sign-conversion https://github.com/square/okio/commit/81bce1a30af244550b0324597720e4799281da7b";>81bce1a Fix a bug where xlen larger than 0x7fff was rejected (https://redirect.github.com/square/okio/issues/1280";>#1280) Additional commits viewable in https://github.com/square/okio/compare/okio-parent-1.17.2...parent-3.4.0";>compare view [![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=com.squareup.okio:okio&package-manager=maven&previous-version=1.17.2&new-version=3.4.0)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any
[jira] [Commented] (FLINK-32589) Carry over parallelism overrides to prevent users from clearing them on updates
[ https://issues.apache.org/jira/browse/FLINK-32589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742887#comment-17742887 ] Gyula Fora commented on FLINK-32589: I think this would be a good improvement but we have to consider and document how user specified overrides will interact with this, how the user can override the autoscaler set parallelisms, or completely clear them. > Carry over parallelism overrides to prevent users from clearing them on > updates > --- > > Key: FLINK-32589 > URL: https://issues.apache.org/jira/browse/FLINK-32589 > Project: Flink > Issue Type: Improvement > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Priority: Major > > The autoscaler currently sets the parallelism overrides via the Flink config > {{pipeline.jobvertex-parallelism-overrides}}. Whenever the user posts specs > updates, special care needs to be taken in order to carry over existing > overrides. Otherwise the job will reset to the default parallelism > configuration. Users shouldn't have to deal with this. Instead, whenever a > new spec is posted which does not contain the overrides, the operator should > automatically apply the last-used overrides (if autoscaling is enabled). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32589) Carry over parallelism overrides to prevent users from clearing them on updates
[ https://issues.apache.org/jira/browse/FLINK-32589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742886#comment-17742886 ] Gyula Fora commented on FLINK-32589: So just to be clear, do you suggest adding a new status field with the autoscaler overrides and always applying them to the spec? This way we could actually get rid of any spec modification done by the autoscaler module and let the reconciler simply apply it without updating it in k8s. > Carry over parallelism overrides to prevent users from clearing them on > updates > --- > > Key: FLINK-32589 > URL: https://issues.apache.org/jira/browse/FLINK-32589 > Project: Flink > Issue Type: Improvement > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Priority: Major > > The autoscaler currently sets the parallelism overrides via the Flink config > {{pipeline.jobvertex-parallelism-overrides}}. Whenever the user posts specs > updates, special care needs to be taken in order to carry over existing > overrides. Otherwise the job will reset to the default parallelism > configuration. Users shouldn't have to deal with this. Instead, whenever a > new spec is posted which does not contain the overrides, the operator should > automatically apply the last-used overrides (if autoscaling is enabled). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32420) Watermark aggregation performance is poor when watermark alignment is enabled and parallelism is high
[ https://issues.apache.org/jira/browse/FLINK-32420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-32420: --- Release Note: This performance improvement would be good to mention in the release blog post. As proven by the micro benchmarks (screenshots attached in the ticket), with 5000 subtasks, the time to calculate the watermark alignment on the JobManager by a factor of 76x (7664%). Previously such large jobs where actually at large risk of overloading JobManager, now that's far less likely to happen. > Watermark aggregation performance is poor when watermark alignment is enabled > and parallelism is high > - > > Key: FLINK-32420 > URL: https://issues.apache.org/jira/browse/FLINK-32420 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common >Affects Versions: 1.17.1 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > Attachments: Screenshot 2023-07-13 at 17.19.11.png, Screenshot > 2023-07-13 at 17.19.24.png > > > The > [SourceCoordinator.WatermarkAggregator#aggregate|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L644] > method will find the smallest watermark of all keys as the > aggregatedWatermark. > However, the time complexity of the aggregate method in a WatermarkAlignment > updateInterval cycle is O(n*n),because: > * Every subtask report a latest watermark to SourceCoordinator in a > WatermarkAlignment updateInterval cycle > * SourceCoordinator updates the smallest watermark from all subtasks for > each reporting > In general, the key is subtaskIndex, so the number of key is parallelism. > When the parallelism is high, the watermark aggregation performance will be > poor. > h1. Performance Test: > The parallelism is 1, each subtask reports 20 watermarks, and the > aggregate method takes 18.921s. Almost every round takes 950 ms. > * If the watermarkAlignment updateInterval is 1s, SourceCoordinator will be > very busy. > * If it's less than 1s, the Watermark aggregation will be delayed > I have finished the POC for performance improvement, and reduced Watermark > aggregation time per watermarkAlignment updateInterval cycle from 950 ms to 6 > ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32589) Carry over parallelism overrides to prevent users from clearing them on updates
[ https://issues.apache.org/jira/browse/FLINK-32589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742865#comment-17742865 ] Maximilian Michels commented on FLINK-32589: We handle this in our application stack but I think this is something that the operator should do for the user. Most users probably do a POST/PUT on the spec which will clear any overrides. This will come as a surprise to users. It is really an implementation detail that the overrides are handled via an undocumented Flink configuration option. The overrides are more part of the applications status than actual configuration. Users can turn off autoscaling which should clear any overrides. > Carry over parallelism overrides to prevent users from clearing them on > updates > --- > > Key: FLINK-32589 > URL: https://issues.apache.org/jira/browse/FLINK-32589 > Project: Flink > Issue Type: Improvement > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Priority: Major > > The autoscaler currently sets the parallelism overrides via the Flink config > {{pipeline.jobvertex-parallelism-overrides}}. Whenever the user posts specs > updates, special care needs to be taken in order to carry over existing > overrides. Otherwise the job will reset to the default parallelism > configuration. Users shouldn't have to deal with this. Instead, whenever a > new spec is posted which does not contain the overrides, the operator should > automatically apply the last-used overrides (if autoscaling is enabled). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] XComp commented on a diff in pull request #22987: [FLINK-32583][rest] Fix deadlock in RestClient
XComp commented on code in PR #22987: URL: https://github.com/apache/flink/pull/22987#discussion_r1262773979 ## flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java: ## @@ -501,6 +501,22 @@ private CompletableFuture submitRequest( } }); +// [FLINK-32583] If connectFuture failed instantly but channelFuture is unresolved, it may +// mean the executor service Netty is using has shut down, in which case the above listener +// to complete channelFuture will never run +if (connectFuture.isDone() && !connectFuture.isSuccess() && !channelFuture.isDone()) { Review Comment: Should we add a comment why we're not handling the success case? Essentially, we're trying to work around a bug in the netty code, aren't we? ``` /** * Adds the specified listener to this future. The * specified listener is notified when this future is * {@linkplain #isDone() done}. If this future is already * completed, the specified listener is notified immediately. */ ``` The JavaDoc of `Future.addListener` states that the listener would be informed if the future is already completed (which includes the successful and the exceptional case). But that doesn't match the implementation, apparently. Right now, we're missing the `isCancelled` and `isSuccess` case handling. It's not clear whether these cases can actually happen. But since we're (at least) covering the success case in the listener implementation above, wouldn't it be reasonable to cover it here as well? ## flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java: ## @@ -207,6 +209,42 @@ public void testRestClientClosedHandling() throws Exception { } } +/** + * Tests that the futures returned by {@link RestClient} fail immediately if the client is + * already closed. + * + * See FLINK-32583 + */ +@Test +public void testCloseClientBeforeRequest() throws Exception { +// Note that the executor passed to the RestClient constructor is not the same as the +// executor used by Netty +try (final RestClient restClient = +new RestClient(new Configuration(), Executors.directExecutor())) { +// Intentionally close the client (and thus also the executor used by Netty) +restClient.close(); + +CompletableFuture future = +restClient.sendRequest( +unroutableIp, +80, +new TestMessageHeaders(), +EmptyMessageParameters.getInstance(), +EmptyRequestBody.getInstance()); + +// Call get() on the future with a timeout of 0 so we can test that the exception thrown +// is not a TimeoutException, which is what would be thrown if restClient were not +// already closed +final ThrowingRunnable getFuture = () -> future.get(0, TimeUnit.SECONDS); + +final ExecutionException executionException = +assertThrows(ExecutionException.class, getFuture); +final Throwable throwable = ExceptionUtils.stripExecutionException(executionException); +assertThat(throwable, instanceOf(IOException.class)); +assertThat(throwable.getMessage(), containsString("RestClient is closed")); Review Comment: ```suggestion final Throwable cause = assertThrows(ExecutionException.class, getFuture).getCause(); assertThat(cause, instanceOf(IOException.class)); assertThat(cause.getMessage(), containsString("RestClient is closed")); ``` nit: What about extracting the cause right away and doing the assertions on the cause? That makes the test more restrictive. We shouldn't expect a multiple-layer stacktrace here, should we? ## flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java: ## @@ -501,6 +501,22 @@ private CompletableFuture submitRequest( } }); +// [FLINK-32583] If connectFuture failed instantly but channelFuture is unresolved, it may +// mean the executor service Netty is using has shut down, in which case the above listener +// to complete channelFuture will never run +if (connectFuture.isDone() && !connectFuture.isSuccess() && !channelFuture.isDone()) { +final String message; +if (!isRunning.get()) { Review Comment: nit: Could we invert the if condition? ...just for readability purposes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For que
[GitHub] [flink-kubernetes-operator] gyfora merged pull request #630: [hotfix] Fix in-place rescaling for removed overrides
gyfora merged PR #630: URL: https://github.com/apache/flink-kubernetes-operator/pull/630 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32420) Watermark aggregation performance is poor when watermark alignment is enabled and parallelism is high
[ https://issues.apache.org/jira/browse/FLINK-32420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742849#comment-17742849 ] Rui Fan commented on FLINK-32420: - Thanks [~pnowojski] for update the performance change here > Watermark aggregation performance is poor when watermark alignment is enabled > and parallelism is high > - > > Key: FLINK-32420 > URL: https://issues.apache.org/jira/browse/FLINK-32420 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common >Affects Versions: 1.17.1 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > Attachments: Screenshot 2023-07-13 at 17.19.11.png, Screenshot > 2023-07-13 at 17.19.24.png > > > The > [SourceCoordinator.WatermarkAggregator#aggregate|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L644] > method will find the smallest watermark of all keys as the > aggregatedWatermark. > However, the time complexity of the aggregate method in a WatermarkAlignment > updateInterval cycle is O(n*n),because: > * Every subtask report a latest watermark to SourceCoordinator in a > WatermarkAlignment updateInterval cycle > * SourceCoordinator updates the smallest watermark from all subtasks for > each reporting > In general, the key is subtaskIndex, so the number of key is parallelism. > When the parallelism is high, the watermark aggregation performance will be > poor. > h1. Performance Test: > The parallelism is 1, each subtask reports 20 watermarks, and the > aggregate method takes 18.921s. Almost every round takes 950 ms. > * If the watermarkAlignment updateInterval is 1s, SourceCoordinator will be > very busy. > * If it's less than 1s, the Watermark aggregation will be delayed > I have finished the POC for performance improvement, and reduced Watermark > aggregation time per watermarkAlignment updateInterval cycle from 950 ms to 6 > ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32420) Watermark aggregation performance is poor when watermark alignment is enabled and parallelism is high
[ https://issues.apache.org/jira/browse/FLINK-32420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742846#comment-17742846 ] Piotr Nowojski commented on FLINK-32420: Yeah... no need for further comments :) Thanks [~fanrui] !Screenshot 2023-07-13 at 17.19.11.png|width=600! !Screenshot 2023-07-13 at 17.19.24.png|width=600! > Watermark aggregation performance is poor when watermark alignment is enabled > and parallelism is high > - > > Key: FLINK-32420 > URL: https://issues.apache.org/jira/browse/FLINK-32420 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common >Affects Versions: 1.17.1 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > Attachments: Screenshot 2023-07-13 at 17.19.11.png, Screenshot > 2023-07-13 at 17.19.24.png > > > The > [SourceCoordinator.WatermarkAggregator#aggregate|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L644] > method will find the smallest watermark of all keys as the > aggregatedWatermark. > However, the time complexity of the aggregate method in a WatermarkAlignment > updateInterval cycle is O(n*n),because: > * Every subtask report a latest watermark to SourceCoordinator in a > WatermarkAlignment updateInterval cycle > * SourceCoordinator updates the smallest watermark from all subtasks for > each reporting > In general, the key is subtaskIndex, so the number of key is parallelism. > When the parallelism is high, the watermark aggregation performance will be > poor. > h1. Performance Test: > The parallelism is 1, each subtask reports 20 watermarks, and the > aggregate method takes 18.921s. Almost every round takes 950 ms. > * If the watermarkAlignment updateInterval is 1s, SourceCoordinator will be > very busy. > * If it's less than 1s, the Watermark aggregation will be delayed > I have finished the POC for performance improvement, and reduced Watermark > aggregation time per watermarkAlignment updateInterval cycle from 950 ms to 6 > ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32420) Watermark aggregation performance is poor when watermark alignment is enabled and parallelism is high
[ https://issues.apache.org/jira/browse/FLINK-32420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-32420: --- Attachment: Screenshot 2023-07-13 at 17.19.11.png Screenshot 2023-07-13 at 17.19.24.png > Watermark aggregation performance is poor when watermark alignment is enabled > and parallelism is high > - > > Key: FLINK-32420 > URL: https://issues.apache.org/jira/browse/FLINK-32420 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common >Affects Versions: 1.17.1 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > Attachments: Screenshot 2023-07-13 at 17.19.11.png, Screenshot > 2023-07-13 at 17.19.24.png > > > The > [SourceCoordinator.WatermarkAggregator#aggregate|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L644] > method will find the smallest watermark of all keys as the > aggregatedWatermark. > However, the time complexity of the aggregate method in a WatermarkAlignment > updateInterval cycle is O(n*n),because: > * Every subtask report a latest watermark to SourceCoordinator in a > WatermarkAlignment updateInterval cycle > * SourceCoordinator updates the smallest watermark from all subtasks for > each reporting > In general, the key is subtaskIndex, so the number of key is parallelism. > When the parallelism is high, the watermark aggregation performance will be > poor. > h1. Performance Test: > The parallelism is 1, each subtask reports 20 watermarks, and the > aggregate method takes 18.921s. Almost every round takes 950 ms. > * If the watermarkAlignment updateInterval is 1s, SourceCoordinator will be > very busy. > * If it's less than 1s, the Watermark aggregation will be delayed > I have finished the POC for performance improvement, and reduced Watermark > aggregation time per watermarkAlignment updateInterval cycle from 950 ms to 6 > ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32589) Carry over parallelism overrides to prevent users from clearing them on updates
[ https://issues.apache.org/jira/browse/FLINK-32589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742840#comment-17742840 ] Gyula Fora commented on FLINK-32589: I think this is not necessarily something that should be handled on the operator side otherwise the user has no way of actually removing the overrides. If they use kubectly apply (serversideApply) without the override config defined, the configs would be naturally merged and carried over. By replacing the user can remove it by not setting the override. > Carry over parallelism overrides to prevent users from clearing them on > updates > --- > > Key: FLINK-32589 > URL: https://issues.apache.org/jira/browse/FLINK-32589 > Project: Flink > Issue Type: Improvement > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Priority: Major > > The autoscaler currently sets the parallelism overrides via the Flink config > {{pipeline.jobvertex-parallelism-overrides}}. Whenever the user posts specs > updates, special care needs to be taken in order to carry over existing > overrides. Otherwise the job will reset to the default parallelism > configuration. Users shouldn't have to deal with this. Instead, whenever a > new spec is posted which does not contain the overrides, the operator should > automatically apply the last-used overrides (if autoscaling is enabled). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation
Samrat002 commented on code in PR #47: URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1262718932 ## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java: ## @@ -0,0 +1,1188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.aws.config.AWSConfigConstants; +import org.apache.flink.connector.aws.util.AWSClientUtil; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionImpl; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedCatalogView; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; +import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator; +import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator; +import org.apache.flink.table.catalog.glue.operator.GluePartitionOperator; +import org.apache.flink.table.catalog.glue.operator.GlueTableOperator; +import org.apache.flink.table.catalog.glue.util.GlueUtils; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.GetTablesRequest; +import software.amazon.awssdk.services.glue.model.GetTablesResponse; +import software.amazon.awssdk.services.glue.model.GlueException; +import software.amazon.awssdk.services.glue.model.Partition; +import software.amazon.awssdk.services.glue.model.Table; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; + +/** Glue catalog implementation that uses AWS Glue Data Catalog as persistence at backend. */ +@PublicEvolving +public class GlueCa
[GitHub] [flink] wanglijie95 commented on a diff in pull request #22861: [FLINK-32387][runtime] InputGateDeploymentDescriptor uses cache to avoid deserializing shuffle descriptors multiple times
wanglijie95 commented on code in PR #22861: URL: https://github.com/apache/flink/pull/22861#discussion_r1262696361 ## flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java: ## @@ -139,53 +139,67 @@ public IndexRange getConsumedSubpartitionIndexRange() { return consumedSubpartitionIndexRange; } -public void loadBigData(@Nullable PermanentBlobService blobService, JobID jobId) +public ShuffleDescriptor[] getShuffleDescriptors() { +if (inputChannels == null) { +throw new IllegalStateException("InputChannel should not be null."); +} +return inputChannels; +} + +public void loadBigDataAndDeserializeShuffleDescriptors( +@Nullable PermanentBlobService blobService, +JobID jobId, +ShuffleDescriptorsCache shuffleDescriptorsCache) throws IOException { -for (int i = 0; i < serializedInputChannels.size(); i++) { -MaybeOffloaded shuffleDescriptors = -serializedInputChannels.get(i); -if (shuffleDescriptors instanceof Offloaded) { -PermanentBlobKey blobKey = -((Offloaded) shuffleDescriptors) -.serializedValueKey; +try { +if (inputChannels == null) { +inputChannels = new ShuffleDescriptor[numberOfInputChannels]; +} -Preconditions.checkNotNull(blobService); +for (MaybeOffloaded serializedShuffleDescriptors : Review Comment: Maybe there is some misunderstanding in our offline discussion, I think it is ok to introduce `ShuffleDescriptorGroup`(or `ShuffleDescriptorList`), which helps to understand the serialization process (The shuffle descriptors in the same`ShuffleDescriptorGroup` are serialized together) ## flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ShuffleDescriptorsCache.java: ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory; + +/** Cache of shuffle descriptors in TaskExecutor. */ +public interface ShuffleDescriptorsCache { +/** + * Start cache manager. + * + * @param mainThreadExecutor of main thread executor. + */ +void start(ComponentMainThreadExecutor mainThreadExecutor); + +/** Stop cache manager. */ +void stop(); + +/** + * Get shuffle descriptors in cache. + * + * @param blobKey identify the shuffle descriptors + * @return shuffle descriptors in cache if exists, otherwise null + */ +TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[] get(PermanentBlobKey blobKey); + +/** + * Put shuffle descriptors to cache. + * + * @param jobId of job + * @param blobKey identify the shuffle descriptors + * @param shuffleDescriptorAndIndices shuffle descriptors to cache + */ +void put( +JobID jobId, +PermanentBlobKey blobKey, +TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[] +shuffleDescriptorAndIndices); + +/** + * Clear all cache of the Job. + * + * @param jobId of job + */ +void clearCacheOfJob(JobID jobId); Review Comment: Maybe `clearCacheForJob` ## flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java: ## @@ -68,6 +69,9 @@ public class TaskManagerServices { private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServices.class); +private static final Duration SHUFFLE_DESCRIPTORS_CACHE_TIMEOUT = Duration.ofSeconds(300); +private static final int SHUFFLE_DESCRIPTORS_CACHE_SIZE_LIMIT = 100; Review Comment: Is this the num of `shuffle descriptors`? or the num of `Blob/ShuffleDescriptorGroup`? ## flink-runtime/src/main/java/org/apache/f
[jira] [Comment Edited] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect
[ https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742834#comment-17742834 ] jasonliangyc edited comment on FLINK-32579 at 7/13/23 3:09 PM: --- Hi [~337361...@qq.com]. Yes, I'm using the 1.17.0, but i also tried the 1.17.1, same issues. I can reproduce the problems by using the attached [^test_case.sql], could you try it again by following the setups in that file? thanks. !image-2023-07-13-22-35-35-696.png|width=674,height=164! For the second problem, it means: if {code:java} where p.name = '??' {code} then the value of column 'name' return '??' if {code:java} where p.name = '+' {code} then the value of column 'name' return '+' but actually there are no such data in the 'products' table, it is weird that it always return the same value as the one in the . {code:java} select cdc.order_id, cdc.order_date, cdc.customer_name, cdc.price, p.name FROM orders AS cdc left JOIN products FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id where p.name = '??'{code} !image-2023-07-13-22-38-16-709.png|width=663,height=120! !image-2023-07-13-22-43-45-957.png|width=662,height=141! was (Author: JIRAUSER301313): Hi [~337361...@qq.com]. Yes, I'm using the 1.17.0, but i also tried the 1.17.1, same issues. I can reproduce the problems by using the attached [^test_case.sql], could you try it again by following the setups in that file? thanks. !image-2023-07-13-22-35-35-696.png|width=674,height=164! For the second problem, it means: if *where p.name = '??'* ,then the value of column 'name' return '??' if {*}where p.name = '+'{*}, then the value of column 'name' return '+' but actually there are no such data in the 'products' table, it is weird that it always return the same value as the one in the . {code:java} select cdc.order_id, cdc.order_date, cdc.customer_name, cdc.price, p.name FROM orders AS cdc left JOIN products FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id where p.name = '??'{code} !image-2023-07-13-22-38-16-709.png|width=663,height=120! !image-2023-07-13-22-43-45-957.png|width=662,height=141! > The filter criteria on the lookup table of Lookup join has no effect > - > > Key: FLINK-32579 > URL: https://issues.apache.org/jira/browse/FLINK-32579 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0, 1.17.1 >Reporter: jasonliangyc >Priority: Major > Attachments: image-2023-07-12-09-31-18-261.png, > image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, > image-2023-07-13-17-19-26-972.png, image-2023-07-13-22-35-35-696.png, > image-2023-07-13-22-38-16-709.png, image-2023-07-13-22-43-24-213.png, > image-2023-07-13-22-43-45-957.png, test_case.sql > > > *1.* I joined two tables using the lookup join as below query in sql-client, > the filter criteria of (p.name = '??') didn't shows up in the execution > detail and it returned the rows only base on one condiction (cdc.product_id = > p.id) > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and > cdc.product_id = p.id > ; {code} > !image-2023-07-12-09-31-18-261.png|width=657,height=132! > > *2.* It showed the werid results when i changed the query as below, cause > there were no data in the table(products) that the value of column 'name' is > '??' and and execution detail didn't show us the where criteria. > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id > where p.name = '??' > ; {code} > !image-2023-07-12-09-42-59-231.png|width=684,height=102! > !image-2023-07-12-09-47-31-397.png|width=685,height=120! > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect
[ https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742834#comment-17742834 ] jasonliangyc edited comment on FLINK-32579 at 7/13/23 3:06 PM: --- Hi [~337361...@qq.com]. Yes, I'm using the 1.17.0, but i also tried the 1.17.1, same issues. I can reproduce the problems by using the attached [^test_case.sql], could you try it again by following the setups in that file? thanks. !image-2023-07-13-22-35-35-696.png|width=674,height=164! For the second problem, it means: if *where p.name = '??'* ,then the value of column 'name' return '??' if {*}where p.name = '+'{*}, then the value of column 'name' return '+' but actually there are no such data in the 'products' table, it is weird that it always return the same value as the one in the . {code:java} select cdc.order_id, cdc.order_date, cdc.customer_name, cdc.price, p.name FROM orders AS cdc left JOIN products FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id where p.name = '??'{code} !image-2023-07-13-22-38-16-709.png|width=663,height=120! !image-2023-07-13-22-43-45-957.png|width=662,height=141! was (Author: JIRAUSER301313): Hi [~337361...@qq.com]. Yes, I'm using the 1.17.0, but i also tried the 1.17.1, same issues. I can reproduce the problems by using the attached [^test_case.sql], could you try it again by following the setups in that file? thanks. !image-2023-07-13-22-35-35-696.png|width=674,height=164! For the second problem, it means: if <{*}where p.name = '??'{*}>, then the value of column 'name' return '??' if <{*}where p.name = '+'{*}>, then the value of column 'name' return '+' but actually there are no such data in the 'products' table, it is weird that it always return the same value as the one in the . {code:java} select cdc.order_id, cdc.order_date, cdc.customer_name, cdc.price, p.name FROM orders AS cdc left JOIN products FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id where p.name = '??'{code} !image-2023-07-13-22-38-16-709.png|width=663,height=120! !image-2023-07-13-22-43-45-957.png|width=662,height=141! > The filter criteria on the lookup table of Lookup join has no effect > - > > Key: FLINK-32579 > URL: https://issues.apache.org/jira/browse/FLINK-32579 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0, 1.17.1 >Reporter: jasonliangyc >Priority: Major > Attachments: image-2023-07-12-09-31-18-261.png, > image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, > image-2023-07-13-17-19-26-972.png, image-2023-07-13-22-35-35-696.png, > image-2023-07-13-22-38-16-709.png, image-2023-07-13-22-43-24-213.png, > image-2023-07-13-22-43-45-957.png, test_case.sql > > > *1.* I joined two tables using the lookup join as below query in sql-client, > the filter criteria of (p.name = '??') didn't shows up in the execution > detail and it returned the rows only base on one condiction (cdc.product_id = > p.id) > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and > cdc.product_id = p.id > ; {code} > !image-2023-07-12-09-31-18-261.png|width=657,height=132! > > *2.* It showed the werid results when i changed the query as below, cause > there were no data in the table(products) that the value of column 'name' is > '??' and and execution detail didn't show us the where criteria. > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id > where p.name = '??' > ; {code} > !image-2023-07-12-09-42-59-231.png|width=684,height=102! > !image-2023-07-12-09-47-31-397.png|width=685,height=120! > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect
[ https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742834#comment-17742834 ] jasonliangyc edited comment on FLINK-32579 at 7/13/23 3:05 PM: --- Hi [~337361...@qq.com]. Yes, I'm using the 1.17.0, but i also tried the 1.17.1, same issues. I can reproduce the problems by using the attached [^test_case.sql], could you try it again by following the setups in that file? thanks. !image-2023-07-13-22-35-35-696.png|width=674,height=164! For the second problem, it means: if <{*}where p.name = '??'{*}>, then the value of column 'name' return '??' if <{*}where p.name = '+'{*}>, then the value of column 'name' return '+' but actually there are no such data in the 'products' table, it is weird that it always return the same value as the one in the . {code:java} select cdc.order_id, cdc.order_date, cdc.customer_name, cdc.price, p.name FROM orders AS cdc left JOIN products FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id where p.name = '??'{code} !image-2023-07-13-22-38-16-709.png|width=663,height=120! !image-2023-07-13-22-43-45-957.png|width=662,height=141! was (Author: JIRAUSER301313): Hi [~337361...@qq.com]. Yes, I'm using the 1.17.0, but i also tried the 1.17.1, same issues. I can reproduce the problems by using the attached [^test_case.sql], could you try it again by following the setups in that file? thanks. !image-2023-07-13-22-35-35-696.png|width=674,height=164! For the second problem, it means: if <{*}where p.name = '??'{*}>, then the value of column 'name' return '??' if <{*}where p.name = '+'{*}>, then the value of column 'name' return '+' but actually there are no such data in the 'products' table, it is weird that it always return the same value as the one in the . {code:java} select cdc.order_id, cdc.order_date, cdc.customer_name, cdc.price, p.name FROM orders AS cdc left JOIN products FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id where p.name = '??'{code} !image-2023-07-13-22-38-16-709.png|width=663,height=120! !image-2023-07-13-22-43-45-957.png|width=662,height=141! > The filter criteria on the lookup table of Lookup join has no effect > - > > Key: FLINK-32579 > URL: https://issues.apache.org/jira/browse/FLINK-32579 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0, 1.17.1 >Reporter: jasonliangyc >Priority: Major > Attachments: image-2023-07-12-09-31-18-261.png, > image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, > image-2023-07-13-17-19-26-972.png, image-2023-07-13-22-35-35-696.png, > image-2023-07-13-22-38-16-709.png, image-2023-07-13-22-43-24-213.png, > image-2023-07-13-22-43-45-957.png, test_case.sql > > > *1.* I joined two tables using the lookup join as below query in sql-client, > the filter criteria of (p.name = '??') didn't shows up in the execution > detail and it returned the rows only base on one condiction (cdc.product_id = > p.id) > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and > cdc.product_id = p.id > ; {code} > !image-2023-07-12-09-31-18-261.png|width=657,height=132! > > *2.* It showed the werid results when i changed the query as below, cause > there were no data in the table(products) that the value of column 'name' is > '??' and and execution detail didn't show us the where criteria. > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id > where p.name = '??' > ; {code} > !image-2023-07-12-09-42-59-231.png|width=684,height=102! > !image-2023-07-12-09-47-31-397.png|width=685,height=120! > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation
Samrat002 commented on code in PR #47: URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1262700429 ## flink-catalog-aws/flink-catalog-aws-glue/pom.xml: ## @@ -0,0 +1,125 @@ + + +http://maven.apache.org/POM/4.0.0"; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + +4.0.0 + + +org.apache.flink +flink-catalog-aws-parent +4.2-SNAPSHOT + + +flink-catalog-aws-glue +Flink : Catalog : AWS : Glue + + +jar + + + + +org.apache.flink +flink-table-api-java +${flink.version} +provided + + + +org.apache.flink +flink-connector-aws-base +${project.version} + + + +software.amazon.awssdk +glue + + + +software.amazon.awssdk +apache-client + + + +software.amazon.awssdk +url-connection-client + + + + + + +org.apache.flink +flink-architecture-tests-test +test + + + +org.apache.flink +flink-table-common +${flink.version} +test-jar +test + + + +org.apache.flink +flink-table-api-java +${flink.version} +test-jar +test + + + +org.projectlombok +lombok +1.18.22 +test + + + + + + + +org.apache.maven.plugins +maven-jar-plugin + + + +test-jar + + + +META-INF/LICENSE +META-INF/NOTICE + + + + + + + + Review Comment: It was intended for e2e test for catalog. For now removing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect
[ https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742834#comment-17742834 ] jasonliangyc commented on FLINK-32579: -- Hi [~337361...@qq.com]. Yes, I'm using the 1.17.0, but i also tried the 1.17.1, same issues. I can reproduce the problems by using the attached [^test_case.sql], could you try it again by following the setups in that file? thanks. !image-2023-07-13-22-35-35-696.png|width=674,height=164! For the second problem, it means: if <{*}where p.name = '??'{*}>, then the value of column 'name' return '??' if <{*}where p.name = '+'{*}>, then the value of column 'name' return '+' but actually there are no such data in the 'products' table, it is weird that it always return the same value as the one in the . {code:java} select cdc.order_id, cdc.order_date, cdc.customer_name, cdc.price, p.name FROM orders AS cdc left JOIN products FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id where p.name = '??'{code} !image-2023-07-13-22-38-16-709.png|width=663,height=120! !image-2023-07-13-22-43-45-957.png|width=662,height=141! > The filter criteria on the lookup table of Lookup join has no effect > - > > Key: FLINK-32579 > URL: https://issues.apache.org/jira/browse/FLINK-32579 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0, 1.17.1 >Reporter: jasonliangyc >Priority: Major > Attachments: image-2023-07-12-09-31-18-261.png, > image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, > image-2023-07-13-17-19-26-972.png, image-2023-07-13-22-35-35-696.png, > image-2023-07-13-22-38-16-709.png, image-2023-07-13-22-43-24-213.png, > image-2023-07-13-22-43-45-957.png, test_case.sql > > > *1.* I joined two tables using the lookup join as below query in sql-client, > the filter criteria of (p.name = '??') didn't shows up in the execution > detail and it returned the rows only base on one condiction (cdc.product_id = > p.id) > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and > cdc.product_id = p.id > ; {code} > !image-2023-07-12-09-31-18-261.png|width=657,height=132! > > *2.* It showed the werid results when i changed the query as below, cause > there were no data in the table(products) that the value of column 'name' is > '??' and and execution detail didn't show us the where criteria. > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id > where p.name = '??' > ; {code} > !image-2023-07-12-09-42-59-231.png|width=684,height=102! > !image-2023-07-12-09-47-31-397.png|width=685,height=120! > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] akalash commented on a diff in pull request #22761: [FLINK-32298][network] Fix the bug that totalWrittenBytes of BufferWritingResultPartition misses some data
akalash commented on code in PR #22761: URL: https://github.com/apache/flink/pull/22761#discussion_r1262673283 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java: ## @@ -65,7 +65,7 @@ public abstract class BufferWritingResultPartition extends ResultPartition { private TimerGauge hardBackPressuredTimeMsPerSecond = new TimerGauge(); -private long totalWrittenBytes; +protected long totalWrittenBytes; Review Comment: Ideally, `totalWrittenBytes` should be changed only in one class `BufferWritingResultPartition`. So maybe we should forbid `inherited classes` to write to `subpartitions` directly. But honestly, I don't know how to do it easily since the current implementation of `inherited classes` use `subpartitions` a lot and we can not change `subpartitions` visibility from `protected` to `private` . I mean we can create `addToSubpartition` method in `BufferWritingResultPartition` and use it everywhere(and maybe it will be better than now) but it still doesn't protect us from possible future bugs if somebody decides to use `subpartitions#add` directly in `inherited classes`. Anyway, if you don't think that it will make things worse, maybe we indeed for now create the `add`(or `addToSubpartition`) method in `BufferWritingResultPartition` and will use it everywhere. At least it will reduce the number of places where we update `totalWrittenBytes`. Unfortunatelly, I didn't come up with any other solution until now. ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java: ## @@ -190,51 +195,60 @@ public void recover( int oldSubtaskIndex, BufferWithContext bufferWithContext) throws IOException { -try (BufferBuilder bufferBuilder = bufferWithContext.context) { -try (BufferConsumer bufferConsumer = -bufferBuilder.createBufferConsumerFromBeginning()) { -bufferBuilder.finish(); -if (bufferConsumer.isDataAvailable()) { -final List channels = -getMappedChannels(subpartitionInfo); -for (final CheckpointedResultSubpartition channel : channels) { -// channel selector is created from the downstream's point of view: the -// subtask of downstream = subpartition index of recovered buffer -final SubtaskConnectionDescriptor channelSelector = -new SubtaskConnectionDescriptor( -subpartitionInfo.getSubPartitionIdx(), oldSubtaskIndex); -channel.addRecovered( - EventSerializer.toBufferConsumer(channelSelector, false)); -channel.addRecovered(bufferConsumer.copy()); -} -} +try (BufferBuilder bufferBuilder = bufferWithContext.context; +BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumerFromBeginning()) { +bufferBuilder.finish(); +if (!bufferConsumer.isDataAvailable()) { +return; +} +final List mappedSubpartitions = +getMappedSubpartitions(subpartitionInfo); +for (final ResultSubpartitionInfo mappedSubpartition : mappedSubpartitions) { +// channel selector is created from the downstream's point of view: the +// subtask of downstream = subpartition index of recovered buffer +final SubtaskConnectionDescriptor channelSelector = +new SubtaskConnectionDescriptor( +subpartitionInfo.getSubPartitionIdx(), oldSubtaskIndex); +CheckpointedResultPartition checkpointedResultPartition = + getCheckpointedResultPartition(mappedSubpartition.getPartitionIdx()); +checkpointedResultPartition.addRecovered( Review Comment: good. It's better now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect
[ https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jasonliangyc updated FLINK-32579: - Attachment: test_case.sql > The filter criteria on the lookup table of Lookup join has no effect > - > > Key: FLINK-32579 > URL: https://issues.apache.org/jira/browse/FLINK-32579 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0, 1.17.1 >Reporter: jasonliangyc >Priority: Major > Attachments: image-2023-07-12-09-31-18-261.png, > image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, > image-2023-07-13-17-19-26-972.png, image-2023-07-13-22-35-35-696.png, > image-2023-07-13-22-38-16-709.png, image-2023-07-13-22-43-24-213.png, > image-2023-07-13-22-43-45-957.png, test_case.sql > > > *1.* I joined two tables using the lookup join as below query in sql-client, > the filter criteria of (p.name = '??') didn't shows up in the execution > detail and it returned the rows only base on one condiction (cdc.product_id = > p.id) > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and > cdc.product_id = p.id > ; {code} > !image-2023-07-12-09-31-18-261.png|width=657,height=132! > > *2.* It showed the werid results when i changed the query as below, cause > there were no data in the table(products) that the value of column 'name' is > '??' and and execution detail didn't show us the where criteria. > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id > where p.name = '??' > ; {code} > !image-2023-07-12-09-42-59-231.png|width=684,height=102! > !image-2023-07-12-09-47-31-397.png|width=685,height=120! > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect
[ https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jasonliangyc updated FLINK-32579: - Attachment: image-2023-07-13-22-43-45-957.png > The filter criteria on the lookup table of Lookup join has no effect > - > > Key: FLINK-32579 > URL: https://issues.apache.org/jira/browse/FLINK-32579 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0, 1.17.1 >Reporter: jasonliangyc >Priority: Major > Attachments: image-2023-07-12-09-31-18-261.png, > image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, > image-2023-07-13-17-19-26-972.png, image-2023-07-13-22-35-35-696.png, > image-2023-07-13-22-38-16-709.png, image-2023-07-13-22-43-24-213.png, > image-2023-07-13-22-43-45-957.png > > > *1.* I joined two tables using the lookup join as below query in sql-client, > the filter criteria of (p.name = '??') didn't shows up in the execution > detail and it returned the rows only base on one condiction (cdc.product_id = > p.id) > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and > cdc.product_id = p.id > ; {code} > !image-2023-07-12-09-31-18-261.png|width=657,height=132! > > *2.* It showed the werid results when i changed the query as below, cause > there were no data in the table(products) that the value of column 'name' is > '??' and and execution detail didn't show us the where criteria. > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id > where p.name = '??' > ; {code} > !image-2023-07-12-09-42-59-231.png|width=684,height=102! > !image-2023-07-12-09-47-31-397.png|width=685,height=120! > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect
[ https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jasonliangyc updated FLINK-32579: - Attachment: image-2023-07-13-22-43-24-213.png > The filter criteria on the lookup table of Lookup join has no effect > - > > Key: FLINK-32579 > URL: https://issues.apache.org/jira/browse/FLINK-32579 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0, 1.17.1 >Reporter: jasonliangyc >Priority: Major > Attachments: image-2023-07-12-09-31-18-261.png, > image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, > image-2023-07-13-17-19-26-972.png, image-2023-07-13-22-35-35-696.png, > image-2023-07-13-22-38-16-709.png, image-2023-07-13-22-43-24-213.png, > image-2023-07-13-22-43-45-957.png > > > *1.* I joined two tables using the lookup join as below query in sql-client, > the filter criteria of (p.name = '??') didn't shows up in the execution > detail and it returned the rows only base on one condiction (cdc.product_id = > p.id) > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and > cdc.product_id = p.id > ; {code} > !image-2023-07-12-09-31-18-261.png|width=657,height=132! > > *2.* It showed the werid results when i changed the query as below, cause > there were no data in the table(products) that the value of column 'name' is > '??' and and execution detail didn't show us the where criteria. > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id > where p.name = '??' > ; {code} > !image-2023-07-12-09-42-59-231.png|width=684,height=102! > !image-2023-07-12-09-47-31-397.png|width=685,height=120! > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation
Samrat002 commented on code in PR #47: URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1262663157 ## flink-catalog-aws/flink-catalog-aws-glue/pom.xml: ## @@ -0,0 +1,125 @@ + + +http://maven.apache.org/POM/4.0.0"; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + +4.0.0 + + +org.apache.flink +flink-catalog-aws-parent +4.2-SNAPSHOT + + +flink-catalog-aws-glue +Flink : Catalog : AWS : Glue + + +jar + + + + +org.apache.flink +flink-table-api-java +${flink.version} +provided + + + +org.apache.flink +flink-connector-aws-base +${project.version} + + + +software.amazon.awssdk +glue + + + +software.amazon.awssdk +apache-client + + + +software.amazon.awssdk +url-connection-client + + Review Comment: removed url client . it was redundant -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect
[ https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jasonliangyc updated FLINK-32579: - Attachment: image-2023-07-13-22-38-16-709.png > The filter criteria on the lookup table of Lookup join has no effect > - > > Key: FLINK-32579 > URL: https://issues.apache.org/jira/browse/FLINK-32579 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0, 1.17.1 >Reporter: jasonliangyc >Priority: Major > Attachments: image-2023-07-12-09-31-18-261.png, > image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, > image-2023-07-13-17-19-26-972.png, image-2023-07-13-22-35-35-696.png, > image-2023-07-13-22-38-16-709.png > > > *1.* I joined two tables using the lookup join as below query in sql-client, > the filter criteria of (p.name = '??') didn't shows up in the execution > detail and it returned the rows only base on one condiction (cdc.product_id = > p.id) > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and > cdc.product_id = p.id > ; {code} > !image-2023-07-12-09-31-18-261.png|width=657,height=132! > > *2.* It showed the werid results when i changed the query as below, cause > there were no data in the table(products) that the value of column 'name' is > '??' and and execution detail didn't show us the where criteria. > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id > where p.name = '??' > ; {code} > !image-2023-07-12-09-42-59-231.png|width=684,height=102! > !image-2023-07-12-09-47-31-397.png|width=685,height=120! > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect
[ https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jasonliangyc updated FLINK-32579: - Attachment: image-2023-07-13-22-35-35-696.png > The filter criteria on the lookup table of Lookup join has no effect > - > > Key: FLINK-32579 > URL: https://issues.apache.org/jira/browse/FLINK-32579 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0, 1.17.1 >Reporter: jasonliangyc >Priority: Major > Attachments: image-2023-07-12-09-31-18-261.png, > image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, > image-2023-07-13-17-19-26-972.png, image-2023-07-13-22-35-35-696.png > > > *1.* I joined two tables using the lookup join as below query in sql-client, > the filter criteria of (p.name = '??') didn't shows up in the execution > detail and it returned the rows only base on one condiction (cdc.product_id = > p.id) > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and > cdc.product_id = p.id > ; {code} > !image-2023-07-12-09-31-18-261.png|width=657,height=132! > > *2.* It showed the werid results when i changed the query as below, cause > there were no data in the table(products) that the value of column 'name' is > '??' and and execution detail didn't show us the where criteria. > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id > where p.name = '??' > ; {code} > !image-2023-07-12-09-42-59-231.png|width=684,height=102! > !image-2023-07-12-09-47-31-397.png|width=685,height=120! > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation
Samrat002 commented on code in PR #47: URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1262655614 ## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/operator/GlueFunctionOperator.java: ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.operator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogFunctionImpl; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; +import org.apache.flink.table.catalog.glue.util.GlueUtils; +import org.apache.flink.table.resource.ResourceUri; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.AlreadyExistsException; +import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionRequest; +import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionResponse; +import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionRequest; +import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionResponse; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionRequest; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionResponse; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse; +import software.amazon.awssdk.services.glue.model.GlueException; +import software.amazon.awssdk.services.glue.model.PrincipalType; +import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionRequest; +import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionResponse; +import software.amazon.awssdk.services.glue.model.UserDefinedFunction; +import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** Utilities for Glue catalog Function related operations. */ +@Internal +public class GlueFunctionOperator extends GlueOperator { + +private static final Logger LOG = LoggerFactory.getLogger(GlueFunctionOperator.class); + +public GlueFunctionOperator(String catalogName, GlueClient glueClient, String glueCatalogId) { +super(catalogName, glueClient, glueCatalogId); +} + +/** + * Create a function. Function name should be handled in a case-insensitive way. + * + * @param functionPath path of the function + * @param function Flink function to be created + * @throws CatalogException in case of any runtime exception + */ +public void createGlueFunction(ObjectPath functionPath, CatalogFunction function) +throws CatalogException, FunctionAlreadyExistException { + +UserDefinedFunctionInput functionInput = createFunctionInput(functionPath, function); +CreateUserDefinedFunctionRequest.Builder createUDFRequest = +CreateUserDefinedFunctionRequest.builder() +.databaseName(functionPath.getDatabaseName()) +.catalogId(getGlueCatalogId()) +.functionInput(functionInput); +try { +CreateUserDefinedFunctionResponse response = + glueClient.createUserDefinedFunction(createUDFRequest.build()); +GlueUtils.validateGlueResponse(response); +LOG.info(String.format("Function created. %s", functionPath.getFullName())); Review Comment: ack, modified code to use native logger in the PR. -- This is an automated message from the Apache Git Service. To respond to the message, p
[jira] [Created] (FLINK-32589) Carry over parallelism overrides to prevent users from clearing them on updates
Maximilian Michels created FLINK-32589: -- Summary: Carry over parallelism overrides to prevent users from clearing them on updates Key: FLINK-32589 URL: https://issues.apache.org/jira/browse/FLINK-32589 Project: Flink Issue Type: Improvement Components: Autoscaler, Kubernetes Operator Reporter: Maximilian Michels The autoscaler currently sets the parallelism overrides via the Flink config {{pipeline.jobvertex-parallelism-overrides}}. Whenever the user posts specs updates, special care needs to be taken in order to carry over existing overrides. Otherwise the job will reset to the default parallelism configuration. Users shouldn't have to deal with this. Instead, whenever a new spec is posted which does not contain the overrides, the operator should automatically apply the last-used overrides (if autoscaling is enabled). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] XComp commented on pull request #22988: [FLINK-32157] Replaces LeaderConnectionInfo with LeaderInformation
XComp commented on PR #22988: URL: https://github.com/apache/flink/pull/22988#issuecomment-1634351279 (sorry for not approving the PR before merging :innocent: ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-32157) Replaces LeaderConnectionInfo with LeaderInformation
[ https://issues.apache.org/jira/browse/FLINK-32157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl resolved FLINK-32157. --- Fix Version/s: 1.18.0 Resolution: Fixed master: f37d41cf557e9acd113a063dbee442a3a92bf09e > Replaces LeaderConnectionInfo with LeaderInformation > > > Key: FLINK-32157 > URL: https://issues.apache.org/jira/browse/FLINK-32157 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Matthias Pohl >Assignee: Jiadong Lu >Priority: Major > Labels: pull-request-available, starter > Fix For: 1.18.0 > > > {{LeaderConnectionInfo}} and {{LeaderInformation}} have the same purpose. > {{LeaderInformation}} could substitute any occurrences of > {{LeaderConnectionInfo}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] XComp merged pull request #22988: [FLINK-32157] Replaces LeaderConnectionInfo with LeaderInformation
XComp merged PR #22988: URL: https://github.com/apache/flink/pull/22988 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on pull request #22010: [FLINK-31192][connectors/dataGen] Fix dataGen takes too long to initi…
XComp commented on PR #22010: URL: https://github.com/apache/flink/pull/22010#issuecomment-1634336399 > In https://github.com/apache/flink/pull/21971 we added SequenceGeneratorTest, I think we can add the corresponding test method in https://github.com/apache/flink/pull/21971 I'm not a big fan of adding tests that belong in a PR into a separate PR. You could just create the test class in this PR with the set of tests that fit to this PR. PR #21971 should then add additional tests related to its changes. The conflict resolution shouldn't be too complex because your adding independent test methods. :thinking: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on a diff in pull request #22939: [FLINK-32474][table] Support time travel in table planner
luoyuxia commented on code in PR #22939: URL: https://github.com/apache/flink/pull/22939#discussion_r1262460439 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java: ## @@ -380,7 +380,28 @@ public Optional getTable(ObjectIdentifier objectIdentifier resolveCatalogBaseTable(temporaryTable); return Optional.of(ContextResolvedTable.temporary(objectIdentifier, resolvedTable)); } else { -return getPermanentTable(objectIdentifier); +return getPermanentTable(objectIdentifier, Optional.empty()); +} +} + +/** + * Retrieves a fully qualified table with a specific time. If the path is not yet fully Review Comment: nit: If the path is not yet fully qualified, use {@link #qualifyIdentifier(UnresolvedIdentifier)} first. ## flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SnapshotScope.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.calcite.sql.validate; + +import org.apache.calcite.schema.SchemaVersion; +import org.apache.calcite.sql.SqlNode; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.List; + +/** Represent the snapshot of the {@link SqlValidatorScope} */ +public class SnapshotScope extends DelegatingScope { +private final SqlValidatorSnapshot sqlValidatorSnapshot; + +public SnapshotScope(SqlValidatorScope parent, SchemaVersion schemaVersion) { +super(parent); +this.sqlValidatorSnapshot = +new SqlValidatorSnapshot((SqlValidatorImpl) parent.getValidator(), schemaVersion); +} + +@Override +public void resolveTable( +List names, SqlNameMatcher nameMatcher, Path path, Resolved resolved) { +// In the time travel case, the parent of the ScopeSnapshot will always be CatalogScope Review Comment: ```suggestion // In the time travel case, the parent of the ScopeSnapshot will always be CatalogScope ``` ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java: ## @@ -446,12 +467,24 @@ public Optional getPartition( return Optional.empty(); } -private Optional getPermanentTable(ObjectIdentifier objectIdentifier) { +private Optional getPermanentTable( +ObjectIdentifier objectIdentifier, Optional timestamp) { Review Comment: Got warning from my IDE `'Optional' used as type for parameter 'timestamp' `. How about using `@Nullable Long timestamp`? ## flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/IdentifierSnapshotNamespace.java: ## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.calcite.sql.validate; + +import org.apache.calcite.schema.SchemaVersion; + +/** Represent the snapshot of the {@link IdentifierNamespace} */ Review Comment: Sorry for revisiting it again. But I found I'm still confused about the comments. Maybe we can follow the comment `Common base class for DML statement namespaces` for `DmlNamespace`. How about changing the comment to `A namespace for the statement with {@code FOR SYSTEM_TIME AS OF TIMESTAMP} clause.` ? ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/FlinkCalcite
[GitHub] [flink] liuyongvs commented on pull request #22745: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.
liuyongvs commented on PR #22745: URL: https://github.com/apache/flink/pull/22745#issuecomment-1634235602 Hi @snuyanzin do you have time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on pull request #22984: [FLINK-32570][runtime][stream] Deprecates @Public/@PublicEvolving API that uses Flink's Time classes
XComp commented on PR #22984: URL: https://github.com/apache/flink/pull/22984#issuecomment-1634211308 Yup, I agree. I feel like it makes sense to summarize everything in a FLIP. I created [FLIP-335](https://cwiki.apache.org/confluence/display/FLINK/FLIP-335%3A+removing+flink's+time+classes) and started a [discussion thread](https://lists.apache.org/thread/48ysrg1rrtl8s1twg9wmx35l201hnc2w). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on pull request #22984: [FLINK-32570][runtime][stream] Deprecates @Public/@PublicEvolving API that uses Flink's Time classes
zentol commented on PR #22984: URL: https://github.com/apache/flink/pull/22984#issuecomment-1634202032 > I assumed the [discussion mentioned in FLINK-14068](https://lists.apache.org/thread/76yywnwf3lk8qn4dby0vz7yoqx7f7pkj) and [FLINK-14068](https://issues.apache.org/jira/browse/FLINK-14068) itself are good enough as a conclusion. Which is a fair point, but I' just not sure if a discussion thread from 4 years ago where 2.0 wasn't something tangible meets our current bar for making breaking changes :thinking: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fredia commented on pull request #22890: [FLINK-32072][checkpoint] Create and wire FileMergingSnapshotManager with TaskManagerServices
fredia commented on PR #22890: URL: https://github.com/apache/flink/pull/22890#issuecomment-1634170838 @AlexYinHan Thanks for the review and suggestion, I have addressed your comments. It could be very nice if you could take another look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fredia commented on a diff in pull request #22890: [FLINK-32072][checkpoint] Create and wire FileMergingSnapshotManager with TaskManagerServices
fredia commented on code in PR #22890: URL: https://github.com/apache/flink/pull/22890#discussion_r1262496038 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java: ## @@ -75,4 +77,14 @@ CheckpointStreamFactory resolveCheckpointStorageLocation( * @return A toolset for additional operations for state owned by tasks. */ CheckpointStateToolset createTaskOwnedCheckpointStateToolset(); + +/** + * Return {@link org.apache.flink.runtime.state.filesystem.FsMergingCheckpointStorageAccess} if + * file merging is enabled. Otherwise, return itself. + */ +default CheckpointStorageWorkerView toFileMergingStorage( Review Comment: Thanks for your suggestion, putting `toFileMergingStorage` into `CheckpointStorageWorkerView` is to reduce the [type checking in StreamTask](https://github.com/apache/flink/pull/22890#discussion_r1251648094). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fredia commented on a diff in pull request #22890: [FLINK-32072][checkpoint] Create and wire FileMergingSnapshotManager with TaskManagerServices
fredia commented on code in PR #22890: URL: https://github.com/apache/flink/pull/22890#discussion_r1262494966 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.OperatorID; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** An implementation of file merging checkpoint storage to file systems. */ +public class FsMergingCheckpointStorageAccess extends FsCheckpointStorageAccess { + +/** FileMergingSnapshotManager manage files and meta information for checkpoint. */ +private final FileMergingSnapshotManager fileMergingSnapshotManager; + +/** The identity of subtask. */ +private final FileMergingSnapshotManager.SubtaskKey subtaskKey; + +public FsMergingCheckpointStorageAccess( +FileSystem fs, +Path checkpointBaseDirectory, +Path sharedStateDirectory, +Path taskOwnedStateDirectory, +@Nullable Path defaultSavepointDirectory, +JobID jobId, +int fileSizeThreshold, +int writeBufferSize, +FileMergingSnapshotManager fileMergingSnapshotManager, +Environment environment) +throws IOException { +super( +fs, +checkpointBaseDirectory, +defaultSavepointDirectory, +jobId, +fileSizeThreshold, +writeBufferSize); +this.fileMergingSnapshotManager = fileMergingSnapshotManager; +this.subtaskKey = +new SubtaskKey( + OperatorID.fromJobVertexID(environment.getJobVertexId()), +environment.getTaskInfo()); +initSegmentSnapshotManager( Review Comment: Right, `SegmentSnapshotManager` should be `FileMergingSnapshotManager`. 👍I put these two lines of code at the end of the constructor as your suggestion. ## flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.OperatorID; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** An implementation of file merging checkpoint storage to file systems. */ +public class FsMergingCheckpointStorageAccess extends FsCheckpointStorageAccess { + +/** FileMergingSnapshotManager manage files and meta i
[GitHub] [flink] lsyldliu commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.
lsyldliu commented on code in PR #22966: URL: https://github.com/apache/flink/pull/22966#discussion_r1262402227 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/runtimefilter/BatchPhysicalGlobalRuntimeFilterBuilder.java: ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter; + +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.batch.runtimefilter.BatchExecGlobalRuntimeFilterBuilder; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRel; +import org.apache.flink.table.planner.plan.optimize.program.FlinkRuntimeFilterProgram; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.SingleRel; +import org.apache.calcite.rel.type.RelDataType; + +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig; + +/** + * Batch physical RelNode responsible for aggregating all received filters into a global filter. See + * {@link FlinkRuntimeFilterProgram} for more info. + */ +public class BatchPhysicalGlobalRuntimeFilterBuilder extends SingleRel implements BatchPhysicalRel { Review Comment: Please override the `explainTerms` method, it gives a meaningful description for runtime debugging. We can print the corresponding fields of buildIndices, maxRowCount. ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecGlobalRuntimeFilterBuilder.java: ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.batch.runtimefilter; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode; +import org.apache.flink.table.types.logical.LogicalType; + +import java.util.List; + +/** Batch {@link ExecNode} for global runtime filter builder. */ +public class BatchExecGlobalRuntimeFilterBuilder extends ExecNodeBase +implements BatchExecNode { + +private final int maxRowCount; + +public BatchExecGlobalRuntimeFilterBuilder( +ReadableConfig tableConfig, +List inputProperties, +LogicalType outputType, +String description, +int maxRowCount) { +super( +ExecNodeContext.newNodeId(), + ExecNodeContext.newContext(BatchExecLocalRuntimeFilterBuilder.class), +ExecNodeContext.newPersiste
[GitHub] [flink-web] Limookiplimo opened a new pull request, #663: Refactor grammar on flink-architecture markdown
Limookiplimo opened a new pull request, #663: URL: https://github.com/apache/flink-web/pull/663 Updated grammar on the markdown to improve clarity and accuracy. Added the preposition "with" after "work well" and changed "each of" to "with each of" for consistency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-32049) CoordinatedSourceRescaleITCase.testDownscaling fails on AZP
[ https://issues.apache.org/jira/browse/FLINK-32049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan resolved FLINK-32049. - Fix Version/s: 1.18.0 1.17.2 Assignee: Rui Fan (was: Qingsheng Ren) Resolution: Fixed > CoordinatedSourceRescaleITCase.testDownscaling fails on AZP > --- > > Key: FLINK-32049 > URL: https://issues.apache.org/jira/browse/FLINK-32049 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.18.0, 1.17.1 >Reporter: Sergey Nuyanzin >Assignee: Rui Fan >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.18.0, 1.17.2 > > Attachments: logs-cron_azure-test_cron_azure_connect_2-1686196685.zip > > > CoordinatedSourceRescaleITCase.testDownscaling fails with > {noformat} > May 08 03:19:14 [ERROR] Failures: > May 08 03:19:14 [ERROR] > CoordinatedSourceRescaleITCase.testDownscaling:75->resumeCheckpoint:107 > May 08 03:19:14 Multiple Failures (1 failure) > May 08 03:19:14 -- failure 1 -- > May 08 03:19:14 [Any cause contains message 'successfully restored > checkpoint'] > May 08 03:19:14 Expecting any element of: > May 08 03:19:14 [org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > May 08 03:19:14 at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > May 08 03:19:14 at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > May 08 03:19:14 at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > May 08 03:19:14 ...(45 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed), > May 08 03:19:14 org.apache.flink.runtime.JobException: Recovery is > suppressed by NoRestartBackoffTimeStrategy > May 08 03:19:14 at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) > May 08 03:19:14 at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) > May 08 03:19:14 at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258) > May 08 03:19:14 ...(35 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed), > May 08 03:19:14 java.lang.IllegalStateException: This executor has been > registered. > May 08 03:19:14 at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > May 08 03:19:14 at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.registerSubtask(ChannelStateWriteRequestExecutorImpl.java:341) > May 08 03:19:14 at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory.getOrCreateExecutor(ChannelStateWriteRequestExecutorFactory.java:63) > May 08 03:19:14 ...(17 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed)] > May 08 03:19:14 to satisfy the given assertions requirements but none did: > May 08 03:19:14 > May 08 03:19:14 org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > May 08 03:19:14 at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > May 08 03:19:14 at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > May 08 03:19:14 at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > May 08 03:19:14 ...(45 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed) > May 08 03:19:14 error: > May 08 03:19:14 Expecting throwable message: > May 08 03:19:14 "Job execution failed." > May 08 03:19:14 to contain: > May 08 03:19:14 "successfully restored checkpoint" > May 08 03:19:14 but did not. > May 08 03:19:14 > {noformat} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48772&view=logs&j=fc7981dc-d266-55b0-5fff-f0d0a2294e36&t=1a9b228a-3e0e-598f-fc81-c321539dfdbf&l=7191 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32049) CoordinatedSourceRescaleITCase.testDownscaling fails on AZP
[ https://issues.apache.org/jira/browse/FLINK-32049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742758#comment-17742758 ] Rui Fan commented on FLINK-32049: - Hi [~Sergey Nuyanzin][~renqs], thanks for the reporting. We have fixed the bug. In theory, this exception cannot happen again. I close this JIRA first, please cc me if it happens again. > CoordinatedSourceRescaleITCase.testDownscaling fails on AZP > --- > > Key: FLINK-32049 > URL: https://issues.apache.org/jira/browse/FLINK-32049 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.18.0, 1.17.1 >Reporter: Sergey Nuyanzin >Assignee: Qingsheng Ren >Priority: Critical > Labels: pull-request-available, test-stability > Attachments: logs-cron_azure-test_cron_azure_connect_2-1686196685.zip > > > CoordinatedSourceRescaleITCase.testDownscaling fails with > {noformat} > May 08 03:19:14 [ERROR] Failures: > May 08 03:19:14 [ERROR] > CoordinatedSourceRescaleITCase.testDownscaling:75->resumeCheckpoint:107 > May 08 03:19:14 Multiple Failures (1 failure) > May 08 03:19:14 -- failure 1 -- > May 08 03:19:14 [Any cause contains message 'successfully restored > checkpoint'] > May 08 03:19:14 Expecting any element of: > May 08 03:19:14 [org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > May 08 03:19:14 at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > May 08 03:19:14 at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > May 08 03:19:14 at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > May 08 03:19:14 ...(45 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed), > May 08 03:19:14 org.apache.flink.runtime.JobException: Recovery is > suppressed by NoRestartBackoffTimeStrategy > May 08 03:19:14 at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) > May 08 03:19:14 at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) > May 08 03:19:14 at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258) > May 08 03:19:14 ...(35 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed), > May 08 03:19:14 java.lang.IllegalStateException: This executor has been > registered. > May 08 03:19:14 at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > May 08 03:19:14 at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.registerSubtask(ChannelStateWriteRequestExecutorImpl.java:341) > May 08 03:19:14 at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory.getOrCreateExecutor(ChannelStateWriteRequestExecutorFactory.java:63) > May 08 03:19:14 ...(17 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed)] > May 08 03:19:14 to satisfy the given assertions requirements but none did: > May 08 03:19:14 > May 08 03:19:14 org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > May 08 03:19:14 at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > May 08 03:19:14 at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > May 08 03:19:14 at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > May 08 03:19:14 ...(45 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed) > May 08 03:19:14 error: > May 08 03:19:14 Expecting throwable message: > May 08 03:19:14 "Job execution failed." > May 08 03:19:14 to contain: > May 08 03:19:14 "successfully restored checkpoint" > May 08 03:19:14 but did not. > May 08 03:19:14 > {noformat} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48772&view=logs&j=fc7981dc-d266-55b0-5fff-f0d0a2294e36&t=1a9b228a-3e0e-598f-fc81-c321539dfdbf&l=7191 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32049) CoordinatedSourceRescaleITCase.testDownscaling fails on AZP
[ https://issues.apache.org/jira/browse/FLINK-32049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742689#comment-17742689 ] Rui Fan edited comment on FLINK-32049 at 7/13/23 10:27 AM: --- Merged via: bc4c21e47040360aab5bcb0f2c18b907b60e7838 1.17 : c8b6c79ee57050cea8be81f56b707ccbcc0fdf4d was (Author: fanrui): Merged via: bc4c21e47040360aab5bcb0f2c18b907b60e7838 > CoordinatedSourceRescaleITCase.testDownscaling fails on AZP > --- > > Key: FLINK-32049 > URL: https://issues.apache.org/jira/browse/FLINK-32049 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.18.0, 1.17.1 >Reporter: Sergey Nuyanzin >Assignee: Qingsheng Ren >Priority: Critical > Labels: pull-request-available, test-stability > Attachments: logs-cron_azure-test_cron_azure_connect_2-1686196685.zip > > > CoordinatedSourceRescaleITCase.testDownscaling fails with > {noformat} > May 08 03:19:14 [ERROR] Failures: > May 08 03:19:14 [ERROR] > CoordinatedSourceRescaleITCase.testDownscaling:75->resumeCheckpoint:107 > May 08 03:19:14 Multiple Failures (1 failure) > May 08 03:19:14 -- failure 1 -- > May 08 03:19:14 [Any cause contains message 'successfully restored > checkpoint'] > May 08 03:19:14 Expecting any element of: > May 08 03:19:14 [org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > May 08 03:19:14 at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > May 08 03:19:14 at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > May 08 03:19:14 at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > May 08 03:19:14 ...(45 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed), > May 08 03:19:14 org.apache.flink.runtime.JobException: Recovery is > suppressed by NoRestartBackoffTimeStrategy > May 08 03:19:14 at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) > May 08 03:19:14 at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) > May 08 03:19:14 at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258) > May 08 03:19:14 ...(35 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed), > May 08 03:19:14 java.lang.IllegalStateException: This executor has been > registered. > May 08 03:19:14 at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > May 08 03:19:14 at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.registerSubtask(ChannelStateWriteRequestExecutorImpl.java:341) > May 08 03:19:14 at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory.getOrCreateExecutor(ChannelStateWriteRequestExecutorFactory.java:63) > May 08 03:19:14 ...(17 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed)] > May 08 03:19:14 to satisfy the given assertions requirements but none did: > May 08 03:19:14 > May 08 03:19:14 org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > May 08 03:19:14 at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > May 08 03:19:14 at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > May 08 03:19:14 at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > May 08 03:19:14 ...(45 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed) > May 08 03:19:14 error: > May 08 03:19:14 Expecting throwable message: > May 08 03:19:14 "Job execution failed." > May 08 03:19:14 to contain: > May 08 03:19:14 "successfully restored checkpoint" > May 08 03:19:14 but did not. > May 08 03:19:14 > {noformat} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48772&view=logs&j=fc7981dc-d266-55b0-5fff-f0d0a2294e36&t=1a9b228a-3e0e-598f-fc81-c321539dfdbf&l=7191 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] 1996fanrui merged pull request #22991: [FLINK-32049][checkpoint] Fix thread-safe bug of channel state executor when some subtasks are closed while other subtasks are starting
1996fanrui merged PR #22991: URL: https://github.com/apache/flink/pull/22991 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on pull request #22991: [FLINK-32049][checkpoint] Fix thread-safe bug of channel state executor when some subtasks are closed while other subtasks are starting
1996fanrui commented on PR #22991: URL: https://github.com/apache/flink/pull/22991#issuecomment-1633970439 Thanks for the review, merging~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on pull request #22904: [FLINK-32481][table] Support type inference for procedure
luoyuxia commented on PR #22904: URL: https://github.com/apache/flink/pull/22904#issuecomment-1633962951 @LadyForest Thanks for your reviewing. I have addressed your comment in this [commit](https://github.com/apache/flink/pull/22904/commits/35a62a159cd1fa45aaa250218f013cd43d8d31d1). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] darenwkt commented on a diff in pull request #620: [FLINK-32317] Enrich metadata in CR error field
darenwkt commented on code in PR #620: URL: https://github.com/apache/flink-kubernetes-operator/pull/620#discussion_r1262342704 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtils.java: ## @@ -129,26 +144,56 @@ public static Optional getSubstringWithMaxLength(String str, int limit) private static void enrichMetadata( Throwable throwable, FlinkResourceException flinkResourceException, -int lengthThreshold) { +int lengthThreshold, +Map labelMapper) { +if (flinkResourceException.getAdditionalMetadata() == null) { +flinkResourceException.setAdditionalMetadata(new HashMap<>()); +} + if (throwable instanceof RestClientException) { -flinkResourceException.setAdditionalMetadata( -Map.of( +flinkResourceException +.getAdditionalMetadata() +.put( "httpResponseCode", -((RestClientException) throwable).getHttpResponseStatus().code())); +((RestClientException) throwable).getHttpResponseStatus().code()); } if (throwable instanceof DeploymentFailedException) { getSubstringWithMaxLength( ((DeploymentFailedException) throwable).getReason(), lengthThreshold) .ifPresent( reason -> - flinkResourceException.setAdditionalMetadata( -Map.of("reason", reason))); +flinkResourceException +.getAdditionalMetadata() +.put("reason", reason)); } +labelMapper +.entrySet() +.forEach( +(entry) -> { +Pattern pattern = Pattern.compile(entry.getKey()); + +org.apache.flink.util.ExceptionUtils.findThrowable( +throwable, t -> pattern.matcher(t.getMessage()).find()) Review Comment: Hi Gyula, thanks for catching this, I have fixed the code and added the test, thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on a diff in pull request #22904: [FLINK-32481][table] Support type inference for procedure
luoyuxia commented on code in PR #22904: URL: https://github.com/apache/flink/pull/22904#discussion_r1262339392 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionTemplate.java: ## @@ -133,10 +149,19 @@ private static class DefaultAnnotationHelper { // no implementation } +@ProcedureHint Review Comment: Thanks for the sugeestion. I like it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] PatrickRen commented on a diff in pull request #22924: [FLINK-32404][table] Add catalog modification listener interface and create listener for catalog manager
PatrickRen commented on code in PR #22924: URL: https://github.com/apache/flink/pull/22924#discussion_r1262307415 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/EnvironmentTest.java: ## @@ -25,14 +25,22 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.table.catalog.listener.CatalogModificationEvent; +import org.apache.flink.table.catalog.listener.CatalogModificationListener; +import org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory; import org.apache.flink.types.Row; -import org.junit.Test; +import org.junit.jupiter.api.Test; Review Comment: Nit: we can remove the `public` keyword before all test cases if we update the framework to JUnit 5 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/EnvironmentTest.java: ## @@ -25,14 +25,22 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.table.catalog.listener.CatalogModificationEvent; +import org.apache.flink.table.catalog.listener.CatalogModificationListener; +import org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory; import org.apache.flink.types.Row; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.time.Duration; +import java.util.Arrays; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; Review Comment: Please rewrite assertions using AssertJ (`assertThat`) to align with other test cases. ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java: ## @@ -164,4 +170,30 @@ public static boolean isLegacyConnectorOptions( } } } + +/** Find and create modification listener list from configuration. */ +public static List findCatalogModificationListenerList( +final ReadableConfig configuration, final ClassLoader classLoader) { Review Comment: I found 4 usages of this method, but the test case only covers one of them (the `TableEnvironmentImpl` one). What about adding cases for other usages? I'm a bit concern if the `configuration` is passed in correctly end-to-end (like user sets them in SQL client session, TableEnv etc.), instead of being intercepted by a hidden `new Configuration()` that messes everything up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-32588) Flink ML unittest BoundedPerRoundStreamIterationITCase failed
Jiang Xin created FLINK-32588: - Summary: Flink ML unittest BoundedPerRoundStreamIterationITCase failed Key: FLINK-32588 URL: https://issues.apache.org/jira/browse/FLINK-32588 Project: Flink Issue Type: Bug Components: Library / Machine Learning Reporter: Jiang Xin Fix For: ml-2.4.0 [https://github.com/apache/flink-ml/actions/runs/5306457279/jobs/9604069705] [https://github.com/apache/flink-ml/actions/runs/5166305530/jobs/9306327867] The error message is as below. {code:java} Error: testPerRoundIterationWithState Time elapsed: 7.192 s <<< FAILURE! 620java.lang.AssertionError: expected:<3> but was:<4> 621 at org.junit.Assert.fail(Assert.java:89) 622 at org.junit.Assert.failNotEquals(Assert.java:835) 623 at org.junit.Assert.assertEquals(Assert.java:647) 624 at org.junit.Assert.assertEquals(Assert.java:633) 625 at org.apache.flink.test.iteration.BoundedPerRoundStreamIterationITCase.testPerRoundIterationWithState(BoundedPerRoundStreamIterationITCase.java:170) 626 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 627 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 628 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 629 at java.lang.reflect.Method.invoke(Method.java:498) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] LadyForest commented on a diff in pull request #22904: [FLINK-32481][table] Support type inference for procedure
LadyForest commented on code in PR #22904: URL: https://github.com/apache/flink/pull/22904#discussion_r1262153293 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionTemplate.java: ## @@ -146,6 +171,15 @@ private static T defaultAsNull(FunctionHint hint, Function return actualValue; } +private static T defaultAsNull(ProcedureHint hint, Function accessor) { +final T defaultValue = accessor.apply(getDefaultProcedureHintAnnotation()); +final T actualValue = accessor.apply(hint); +if (Objects.deepEquals(defaultValue, actualValue)) { +return null; +} +return actualValue; +} + Review Comment: Nit: What about ```java private static T defaultAsNull(FunctionHint hint, Function accessor) { return defaultAsNull(hint, DEFAULT_ANNOTATION, accessor); } private static T defaultAsNull(ProcedureHint hint, Function accessor) { return defaultAsNull(hint, getDefaultProcedureHintAnnotation(), accessor); } private static T defaultAsNull(H hint, H defaultHint, Function accessor) { final T defaultValue = accessor.apply(defaultHint); final T actualValue = accessor.apply(hint); if (Objects.deepEquals(defaultValue, actualValue)) { return null; } return actualValue; } `` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32481) Support type inference for procedure
[ https://issues.apache.org/jira/browse/FLINK-32481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32481: --- Labels: pull-request-available (was: ) > Support type inference for procedure > > > Key: FLINK-32481 > URL: https://issues.apache.org/jira/browse/FLINK-32481 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API, Table SQL / Runtime >Reporter: luoyuxia >Assignee: luoyuxia >Priority: Major > Labels: pull-request-available > > Currently, FunctionMappingExtractor can only handle the type inference for > procedure. We can extend it to make it can also handle procedure. Since > procedure is much similar to function, we can resue the stack/code of > {{{}FunctionMappingExtractor{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] LadyForest commented on a diff in pull request #22904: [FLINK-32481][table] Support type inference for procedure
LadyForest commented on code in PR #22904: URL: https://github.com/apache/flink/pull/22904#discussion_r1259336884 ## flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java: ## @@ -577,6 +839,18 @@ static TestSpec forTableAggregateFunction( new DataTypeFactoryMock(), function)); } +static TestSpec forProcedure(Class procedure) { +return forProcedure(null, procedure); +} + +static TestSpec forProcedure(String description, Class procedure) { Review Comment: Nit: add `@Nullable` annotation ## flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java: ## @@ -927,4 +1201,238 @@ private static class DataTypeHintOnScalarFunction extends ScalarFunction { return null; } } + +@ProcedureHint( +input = {@DataTypeHint("INT"), @DataTypeHint("STRING")}, +argumentNames = {"i", "s"}, +output = @DataTypeHint("BOOLEAN")) +private static class FullProcedureHint implements Procedure { +public Boolean[] call(Object procedureContext, Integer i, String s) { +return null; +} +} + +private static class ComplexProcedureHint implements Procedure { +@ProcedureHint( +input = {@DataTypeHint("ARRAY"), @DataTypeHint(inputGroup = InputGroup.ANY)}, +argumentNames = {"myInt", "myAny"}, +output = @DataTypeHint("BOOLEAN"), +isVarArgs = true) +public Boolean[] call(Object procedureContext, Object... o) { +return null; +} +} + +@ProcedureHint(input = @DataTypeHint("INT"), output = @DataTypeHint("INT")) +@ProcedureHint(input = @DataTypeHint("BIGINT"), output = @DataTypeHint("BIGINT")) +private static class FullProcedureHints implements Procedure { +public Number[] call(Object procedureContext, Number n) { +return null; +} +} + +@ProcedureHint(output = @DataTypeHint("INT")) +private static class GlobalOutputProcedureHint implements Procedure { +@ProcedureHint(input = @DataTypeHint("INT")) +public Integer[] call(Object procedureContext, Integer n) { +return null; +} + +@ProcedureHint(input = @DataTypeHint("STRING")) +public Integer[] call(Object procedureContext, String n) { +return null; +} +} + +@ProcedureHint(output = @DataTypeHint("INT")) +private static class InvalidSingleOutputProcedureHint implements Procedure { +@ProcedureHint(output = @DataTypeHint("TINYINT")) +public Integer call(Object procedureContext, Number n) { +return null; +} +} + +@ProcedureHint(input = @DataTypeHint("INT"), output = @DataTypeHint("INT")) +private static class SplitFullProcedureHints implements Procedure { +@ProcedureHint(input = @DataTypeHint("BIGINT"), output = @DataTypeHint("BIGINT")) +public Number[] call(Object procedureContext, Number n) { +return null; +} +} + +@ProcedureHint(input = @DataTypeHint("INT"), output = @DataTypeHint("INT")) +private static class InvalidFullOutputProcedureHint implements Procedure { +@ProcedureHint(input = @DataTypeHint("INT"), output = @DataTypeHint("BIGINT")) +public Number[] call(Object procedureContext, Integer i) { +return null; +} +} + +@ProcedureHint(input = @DataTypeHint("INT"), argumentNames = "a", output = @DataTypeHint("INT")) +private static class InvalidFullOutputProcedureWithArgNamesHint implements Procedure { +@ProcedureHint( +input = @DataTypeHint("INT"), +argumentNames = "b", +output = @DataTypeHint("BIGINT")) +public Number[] call(Object procedureContext, Integer i) { +return null; +} +} + +@ProcedureHint(input = @DataTypeHint("INT")) +private static class InvalidLocalOutputProcedureHint implements Procedure { +@ProcedureHint(output = @DataTypeHint("INT")) +public Integer[] call(Object procedureContext, Integer n) { +return null; +} + +@ProcedureHint(output = @DataTypeHint("STRING")) +public Integer[] call(Object procedureContext, String n) { +return null; +} +} + +@ProcedureHint( +input = {@DataTypeHint("INT"), @DataTypeHint()}, +output = @DataTypeHint("BOOLEAN")) +private static class IncompleteProcedureHint implements Procedure { +public Boolean[] call(Object procedureContext, Integer i1, Integer i2) { +return null; +} +} + +@ProcedureHint(input = @DataTypeHint("INT")) +@ProcedureHint(input = @DataTypeHint("BIGINT")) +private static
[jira] [Created] (FLINK-32587) The results returned from the CDC sql query are null or the value was changed unexpectly
jasonliangyc created FLINK-32587: Summary: The results returned from the CDC sql query are null or the value was changed unexpectly Key: FLINK-32587 URL: https://issues.apache.org/jira/browse/FLINK-32587 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.17.1, 1.17.0 Reporter: jasonliangyc Attachments: image-2023-07-13-17-35-32-235.png, image-2023-07-13-17-37-56-908.png I created a CDC table as below and then run the query 'select * from so_cdc' through sql-client, it gives me the unexpected results. {code:java} CREATE TABLE so_cdc ( REC_ID STRING, Create_Date TIMESTAMP(3), PRIMARY KEY (REC_ID) NOT ENFORCED ) WITH ( 'connector' = 'sqlserver-cdc', 'hostname' = '', 'port' = '', 'username' = 'xxx', 'password' = '', 'database-name' = '', 'schema-name' = '', 'table-name' = 'xxx', 'scan.startup.mode' = 'latest-offset' ); {code} Run the query for the first time, the data look normal. !image-2023-07-13-17-35-32-235.png|width=535,height=141! But after i run the same query multiple times, it gives me the unexpected data, and i'm sure that these two columns of my cdc source table don't contain these data !image-2023-07-13-17-37-56-908.png|width=469,height=175! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] PatrickRen commented on a diff in pull request #22869: [FLINK-32403][table] Add database related operations in CatalogManager
PatrickRen commented on code in PR #22869: URL: https://github.com/apache/flink/pull/22869#discussion_r1262279404 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterDatabaseOperation.java: ## @@ -70,10 +69,13 @@ public String asSummaryString() { @Override public TableResultInternal execute(Context ctx) { -Catalog catalog = ctx.getCatalogManager().getCatalogOrThrowException(getCatalogName()); try { -catalog.alterDatabase(getDatabaseName(), getCatalogDatabase(), false); +ctx.getCatalogManager() +.alterDatabase( +getCatalogName(), getDatabaseName(), getCatalogDatabase(), false); return TableResultImpl.TABLE_RESULT_OK; +} catch (ValidationException e) { +throw e; Review Comment: Do we need to catch the `ValidationException` if we just re-throw it without any additional action? Same question as in `CreateDatabaseOperation` and `DropDatabaseOperation`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22992: [hotfix] Fix typo in JobManagerOptions
flinkbot commented on PR #22992: URL: https://github.com/apache/flink/pull/22992#issuecomment-1633890956 ## CI report: * 72e8439ce49f3ebfe275a653d6ed41b6977de0ee UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32586) Enable input locality in SimpleExecutionSlotAllocator
[ https://issues.apache.org/jira/browse/FLINK-32586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742730#comment-17742730 ] Lijie Wang commented on FLINK-32586: Thanks your proposal, make sense to me, assigned to you :) [~xiasun] > Enable input locality in SimpleExecutionSlotAllocator > - > > Key: FLINK-32586 > URL: https://issues.apache.org/jira/browse/FLINK-32586 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: xingbe >Assignee: xingbe >Priority: Major > Fix For: 1.18.0 > > > At present, the AdaptiveBatchScheduler uses the > `SimpleExecutionSlotAllocator` to assign slot to execution, but it currently > lacks support for the capability of input locality, which may increase > unnecessary data transmission overhead. In this issue, we aim to enable the > `SimpleExecutionSlotAllocator` to support the input locality. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-32586) Enable input locality in SimpleExecutionSlotAllocator
[ https://issues.apache.org/jira/browse/FLINK-32586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Wang reassigned FLINK-32586: -- Assignee: xingbe > Enable input locality in SimpleExecutionSlotAllocator > - > > Key: FLINK-32586 > URL: https://issues.apache.org/jira/browse/FLINK-32586 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: xingbe >Assignee: xingbe >Priority: Major > Fix For: 1.18.0 > > > At present, the AdaptiveBatchScheduler uses the > `SimpleExecutionSlotAllocator` to assign slot to execution, but it currently > lacks support for the capability of input locality, which may increase > unnecessary data transmission overhead. In this issue, we aim to enable the > `SimpleExecutionSlotAllocator` to support the input locality. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32577) Avoid memory fragmentation when running CI for flink-table-planner module
[ https://issues.apache.org/jira/browse/FLINK-32577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yunhong Zheng updated FLINK-32577: -- Description: This issue is a sub-issue of FLINK-18356. was:This issue is a sub-issue of FLINK-18356. > Avoid memory fragmentation when running CI for flink-table-planner module > - > > Key: FLINK-32577 > URL: https://issues.apache.org/jira/browse/FLINK-32577 > Project: Flink > Issue Type: Improvement > Components: Build System / CI, Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Yunhong Zheng >Priority: Major > Fix For: 1.18.0 > > > This issue is a sub-issue of FLINK-18356. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] jiaoqingbo opened a new pull request, #22992: [hotfix] Fix typo in JobManagerOptions
jiaoqingbo opened a new pull request, #22992: URL: https://github.com/apache/flink/pull/22992 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect
[ https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742728#comment-17742728 ] Yunhong Zheng commented on FLINK-32579: --- Hi [~jasonliangyc] . Are you using Flink version 1.17.0? I am unable to reproduce the problem in Figure 1 as you shown in my local UT case. My sql pattern is: {code:java} SELECT * FROM MyTable AS T LEFT JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.b = '?' and T.a = D.id {code} My rel plan result is: !image-2023-07-13-17-19-26-972.png|width=708,height=49! BTW, I don't quite understand what is the problem in your Figure 2? Can you clarify it. Thanks! > The filter criteria on the lookup table of Lookup join has no effect > - > > Key: FLINK-32579 > URL: https://issues.apache.org/jira/browse/FLINK-32579 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0, 1.17.1 >Reporter: jasonliangyc >Priority: Major > Attachments: image-2023-07-12-09-31-18-261.png, > image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, > image-2023-07-13-17-19-26-972.png > > > *1.* I joined two tables using the lookup join as below query in sql-client, > the filter criteria of (p.name = '??') didn't shows up in the execution > detail and it returned the rows only base on one condiction (cdc.product_id = > p.id) > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and > cdc.product_id = p.id > ; {code} > !image-2023-07-12-09-31-18-261.png|width=657,height=132! > > *2.* It showed the werid results when i changed the query as below, cause > there were no data in the table(products) that the value of column 'name' is > '??' and and execution detail didn't show us the where criteria. > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id > where p.name = '??' > ; {code} > !image-2023-07-12-09-42-59-231.png|width=684,height=102! > !image-2023-07-12-09-47-31-397.png|width=685,height=120! > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect
[ https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yunhong Zheng updated FLINK-32579: -- Attachment: image-2023-07-13-17-19-26-972.png > The filter criteria on the lookup table of Lookup join has no effect > - > > Key: FLINK-32579 > URL: https://issues.apache.org/jira/browse/FLINK-32579 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0, 1.17.1 >Reporter: jasonliangyc >Priority: Major > Attachments: image-2023-07-12-09-31-18-261.png, > image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, > image-2023-07-13-17-19-26-972.png > > > *1.* I joined two tables using the lookup join as below query in sql-client, > the filter criteria of (p.name = '??') didn't shows up in the execution > detail and it returned the rows only base on one condiction (cdc.product_id = > p.id) > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and > cdc.product_id = p.id > ; {code} > !image-2023-07-12-09-31-18-261.png|width=657,height=132! > > *2.* It showed the werid results when i changed the query as below, cause > there were no data in the table(products) that the value of column 'name' is > '??' and and execution detail didn't show us the where criteria. > {code:java} > select > cdc.order_id, > cdc.order_date, > cdc.customer_name, > cdc.price, > p.name > FROM orders AS cdc > left JOIN products > FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id > where p.name = '??' > ; {code} > !image-2023-07-12-09-42-59-231.png|width=684,height=102! > !image-2023-07-12-09-47-31-397.png|width=685,height=120! > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32586) Enable input locality in SimpleExecutionSlotAllocator
xingbe created FLINK-32586: -- Summary: Enable input locality in SimpleExecutionSlotAllocator Key: FLINK-32586 URL: https://issues.apache.org/jira/browse/FLINK-32586 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.18.0 Reporter: xingbe Fix For: 1.18.0 At present, the AdaptiveBatchScheduler uses the `SimpleExecutionSlotAllocator` to assign slot to execution, but it currently lacks support for the capability of input locality, which may increase unnecessary data transmission overhead. In this issue, we aim to enable the `SimpleExecutionSlotAllocator` to support the input locality. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32049) CoordinatedSourceRescaleITCase.testDownscaling fails on AZP
[ https://issues.apache.org/jira/browse/FLINK-32049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742689#comment-17742689 ] Rui Fan commented on FLINK-32049: - Merged via: bc4c21e47040360aab5bcb0f2c18b907b60e7838 > CoordinatedSourceRescaleITCase.testDownscaling fails on AZP > --- > > Key: FLINK-32049 > URL: https://issues.apache.org/jira/browse/FLINK-32049 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.18.0, 1.17.1 >Reporter: Sergey Nuyanzin >Assignee: Qingsheng Ren >Priority: Critical > Labels: pull-request-available, test-stability > Attachments: logs-cron_azure-test_cron_azure_connect_2-1686196685.zip > > > CoordinatedSourceRescaleITCase.testDownscaling fails with > {noformat} > May 08 03:19:14 [ERROR] Failures: > May 08 03:19:14 [ERROR] > CoordinatedSourceRescaleITCase.testDownscaling:75->resumeCheckpoint:107 > May 08 03:19:14 Multiple Failures (1 failure) > May 08 03:19:14 -- failure 1 -- > May 08 03:19:14 [Any cause contains message 'successfully restored > checkpoint'] > May 08 03:19:14 Expecting any element of: > May 08 03:19:14 [org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > May 08 03:19:14 at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > May 08 03:19:14 at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > May 08 03:19:14 at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > May 08 03:19:14 ...(45 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed), > May 08 03:19:14 org.apache.flink.runtime.JobException: Recovery is > suppressed by NoRestartBackoffTimeStrategy > May 08 03:19:14 at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) > May 08 03:19:14 at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) > May 08 03:19:14 at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258) > May 08 03:19:14 ...(35 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed), > May 08 03:19:14 java.lang.IllegalStateException: This executor has been > registered. > May 08 03:19:14 at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > May 08 03:19:14 at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.registerSubtask(ChannelStateWriteRequestExecutorImpl.java:341) > May 08 03:19:14 at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory.getOrCreateExecutor(ChannelStateWriteRequestExecutorFactory.java:63) > May 08 03:19:14 ...(17 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed)] > May 08 03:19:14 to satisfy the given assertions requirements but none did: > May 08 03:19:14 > May 08 03:19:14 org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > May 08 03:19:14 at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > May 08 03:19:14 at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > May 08 03:19:14 at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > May 08 03:19:14 ...(45 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed) > May 08 03:19:14 error: > May 08 03:19:14 Expecting throwable message: > May 08 03:19:14 "Job execution failed." > May 08 03:19:14 to contain: > May 08 03:19:14 "successfully restored checkpoint" > May 08 03:19:14 but did not. > May 08 03:19:14 > {noformat} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48772&view=logs&j=fc7981dc-d266-55b0-5fff-f0d0a2294e36&t=1a9b228a-3e0e-598f-fc81-c321539dfdbf&l=7191 -- This message was sent by Atlassian Jira (v8.20.10#820010)