[GitHub] [flink] luoyuxia commented on a diff in pull request #22977: [FLINK-32569][table] Fix the incomplete serialization of ResolvedCatalogTable caused by the newly introduced time travel interface

2023-07-13 Thread via GitHub


luoyuxia commented on code in PR #22977:
URL: https://github.com/apache/flink/pull/22977#discussion_r1263367999


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java:
##
@@ -79,6 +80,11 @@ public static Map 
serializeCatalogTable(ResolvedCatalogTable res
 properties.put(COMMENT, comment);
 }
 
+final Optional snapshot = resolvedTable.getSnapshot();
+if (snapshot.isPresent()) {

Review Comment:
   ```suggestion
   snapshot.ifPresent(snapshotId -> properties.put(SNAPSHOT, 
Long.toString(snapshotId)));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-32020) Enable Dynamic Partition Discovery by Default in Kafka Source based on FLIP-288

2023-07-13 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren resolved FLINK-32020.
---
Resolution: Fixed

> Enable Dynamic Partition Discovery by Default in Kafka Source based on 
> FLIP-288
> ---
>
> Key: FLINK-32020
> URL: https://issues.apache.org/jira/browse/FLINK-32020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.0
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-4.0.0
>
>
> As described in 
> [FLIP-288|https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source],
>  dynamic partition discovery is disabled by default, and users have to 
> specify the interval of discovery in order to turn it on.
> This subtask is to enable Dynamic Partition Discovery by Default in Kafka 
> Source.
> Partition discovery is performed on the KafkaSourceEnumerator, which 
> asynchronously fetches topic metadata from the Kafka cluster and checks if 
> there are any new topics and partitions. This should not cause performance 
> issues on the Flink side.
> On the Kafka broker side, partition discovery sends a MetadataRequest to the 
> Kafka broker to fetch topic information. Considering that the Kafka broker 
> has its metadata cache and the default request frequency is relatively low 
> (once every 30 seconds), this is not a heavy operation, and the broker's 
> performance will not be significantly affected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-31953) FLIP-288: Enable Dynamic Partition Discovery by Default in Kafka Source

2023-07-13 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren resolved FLINK-31953.
---
Resolution: Fixed

> FLIP-288: Enable Dynamic Partition Discovery by Default in Kafka Source
> ---
>
> Key: FLINK-31953
> URL: https://issues.apache.org/jira/browse/FLINK-31953
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Major
>  Labels: Connector, kafka-source, pull-request-available
> Fix For: kafka-4.0.0
>
>
> This improvement implements 
> [FLIP-288|https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source].
>  this Flip has there main objectives:
>  # Enable partition discovery by default.
>  # Provide a *EARLIEST* strategy for later discovered partitions.
>  # Organize the code logic of the current built-in OffsetsInitializer, then 
> modify the JavaDoc to let users know.
> Each objective corresponds to a sub-task
> h4.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32020) Enable Dynamic Partition Discovery by Default in Kafka Source based on FLIP-288

2023-07-13 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17743029#comment-17743029
 ] 

Qingsheng Ren commented on FLINK-32020:
---

Merged to main: 811716c5155e82fa3bfc47ced53daef53bb99cce

> Enable Dynamic Partition Discovery by Default in Kafka Source based on 
> FLIP-288
> ---
>
> Key: FLINK-32020
> URL: https://issues.apache.org/jira/browse/FLINK-32020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.0
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-4.0.0
>
>
> As described in 
> [FLIP-288|https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source],
>  dynamic partition discovery is disabled by default, and users have to 
> specify the interval of discovery in order to turn it on.
> This subtask is to enable Dynamic Partition Discovery by Default in Kafka 
> Source.
> Partition discovery is performed on the KafkaSourceEnumerator, which 
> asynchronously fetches topic metadata from the Kafka cluster and checks if 
> there are any new topics and partitions. This should not cause performance 
> issues on the Flink side.
> On the Kafka broker side, partition discovery sends a MetadataRequest to the 
> Kafka broker to fetch topic information. Considering that the Kafka broker 
> has its metadata cache and the default request frequency is relatively low 
> (once every 30 seconds), this is not a heavy operation, and the broker's 
> performance will not be significantly affected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-kafka] PatrickRen closed pull request #40: [FLINK-32020] Enable Dynamic Partition Discovery by Default in Kafka Source based on FLIP-288

2023-07-13 Thread via GitHub


PatrickRen closed pull request #40: [FLINK-32020] Enable Dynamic Partition 
Discovery by Default in Kafka Source based on FLIP-288
URL: https://github.com/apache/flink-connector-kafka/pull/40


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] jiaoqingbo commented on pull request #22992: [hotfix] Fix typo in JobManagerOptions

2023-07-13 Thread via GitHub


jiaoqingbo commented on PR #22992:
URL: https://github.com/apache/flink/pull/22992#issuecomment-1635348356

   My modification does not seem to be related to the CI error ,please cc 
@wanglijie95 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32335) Fix the Flink ML unittest failure

2023-07-13 Thread Dong Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17743010#comment-17743010
 ] 

Dong Lin commented on FLINK-32335:
--

Merged to apache/flink-ml master branch 892fb4714c6f7e7055f26b4a2d1e1b36094500be

> Fix the Flink ML unittest failure
> -
>
> Key: FLINK-32335
> URL: https://issues.apache.org/jira/browse/FLINK-32335
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Reporter: Jiang Xin
>Assignee: Jiang Xin
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.4.0
>
>
> The [github 
> CI](https://github.com/apache/flink-ml/actions/runs/5227269169/jobs/9438737620)
>  of Flink ML failed because of the following exception.
>  
> {code:java}
> E   Caused by: java.util.ConcurrentModificationException
> 223E  at 
> java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:648)
> 224E  at 
> java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1044)
> 225E  at 
> org.apache.flink.iteration.operator.HeadOperator.parseInputChannelEvents(HeadOperator.java:464)
> 226E  at 
> org.apache.flink.iteration.operator.HeadOperator.endInput(HeadOperator.java:392)
> 227E  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
> 228E  at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97)
> 229E  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
> 230E  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> 231E  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> 232E  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> 233E  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> 234E  at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> 235E  at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> 236E  at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> 237E  at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> 238E  at java.lang.Thread.run(Thread.java:750){code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32335) Fix the Flink ML unittest failure

2023-07-13 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved FLINK-32335.
--
Resolution: Fixed

> Fix the Flink ML unittest failure
> -
>
> Key: FLINK-32335
> URL: https://issues.apache.org/jira/browse/FLINK-32335
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Reporter: Jiang Xin
>Assignee: Jiang Xin
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.4.0
>
>
> The [github 
> CI](https://github.com/apache/flink-ml/actions/runs/5227269169/jobs/9438737620)
>  of Flink ML failed because of the following exception.
>  
> {code:java}
> E   Caused by: java.util.ConcurrentModificationException
> 223E  at 
> java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:648)
> 224E  at 
> java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1044)
> 225E  at 
> org.apache.flink.iteration.operator.HeadOperator.parseInputChannelEvents(HeadOperator.java:464)
> 226E  at 
> org.apache.flink.iteration.operator.HeadOperator.endInput(HeadOperator.java:392)
> 227E  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
> 228E  at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97)
> 229E  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
> 230E  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> 231E  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> 232E  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> 233E  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> 234E  at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> 235E  at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> 236E  at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> 237E  at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> 238E  at java.lang.Thread.run(Thread.java:750){code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect

2023-07-13 Thread Yunhong Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17743009#comment-17743009
 ] 

Yunhong Zheng commented on FLINK-32579:
---

Hi, [~jasonliangyc] . I got it.

For question1: there is no filter condition "p.name = ''" in relNode  
LookupJoin?
 * This is normal because the filter condition has been pushed down to the jdbc 
source (jdbc source supports filter pushdown). The pushed down condition will 
not be displayed in the LookupJoin node.

For question2: wrong join result?
 * I think this is a bug for jdbc lookup source. For the pushed filter 
condition, the jdbc lookup source did not consume this filter correctly. After 
reading the code, I speculate that this is because the jdbc source doesn't 
process this filter condition for dim table.
 * To quickly verify this error. you can disable filter push down by adding 
config 'table.optimizer.source.predicate-pushdown-enabled'.
 * Also, after verifying, if this error is caused by jdbc source, you can @ 
[~ruanhang1993] taking a look. 

 

> The filter criteria on the lookup table of Lookup join has no effect 
> -
>
> Key: FLINK-32579
> URL: https://issues.apache.org/jira/browse/FLINK-32579
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0, 1.17.1
>Reporter: jasonliangyc
>Priority: Major
> Attachments: image-2023-07-12-09-31-18-261.png, 
> image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, 
> image-2023-07-13-17-19-26-972.png, image-2023-07-13-22-35-35-696.png, 
> image-2023-07-13-22-38-16-709.png, image-2023-07-13-22-43-24-213.png, 
> image-2023-07-13-22-43-45-957.png, test_case.sql
>
>
> *1.* I joined two tables using the lookup join as below query in sql-client, 
> the filter criteria of (p.name = '??') didn't shows up in the execution 
> detail and it returned the rows only base on one condiction (cdc.product_id = 
> p.id)
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and 
> cdc.product_id = p.id
> ; {code}
> !image-2023-07-12-09-31-18-261.png|width=657,height=132!
>  
> *2.* It showed the werid results when i changed the query as below, cause 
> there were no data in the table(products) that the value of column 'name' is 
> '??'  and and execution detail didn't show us the where criteria.
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id
> where p.name = '??'
> ; {code}
> !image-2023-07-12-09-42-59-231.png|width=684,height=102!
> !image-2023-07-12-09-47-31-397.png|width=685,height=120!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32335) Fix the Flink ML unittest failure

2023-07-13 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin reassigned FLINK-32335:


Assignee: Jiang Xin

> Fix the Flink ML unittest failure
> -
>
> Key: FLINK-32335
> URL: https://issues.apache.org/jira/browse/FLINK-32335
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Reporter: Jiang Xin
>Assignee: Jiang Xin
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.4.0
>
>
> The [github 
> CI](https://github.com/apache/flink-ml/actions/runs/5227269169/jobs/9438737620)
>  of Flink ML failed because of the following exception.
>  
> {code:java}
> E   Caused by: java.util.ConcurrentModificationException
> 223E  at 
> java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:648)
> 224E  at 
> java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1044)
> 225E  at 
> org.apache.flink.iteration.operator.HeadOperator.parseInputChannelEvents(HeadOperator.java:464)
> 226E  at 
> org.apache.flink.iteration.operator.HeadOperator.endInput(HeadOperator.java:392)
> 227E  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
> 228E  at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97)
> 229E  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
> 230E  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> 231E  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> 232E  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> 233E  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> 234E  at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> 235E  at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> 236E  at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> 237E  at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> 238E  at java.lang.Thread.run(Thread.java:750){code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] lindong28 merged pull request #247: [FLINK-32335] Fix the ConcurrentModificationException in the unittest

2023-07-13 Thread via GitHub


lindong28 merged PR #247:
URL: https://github.com/apache/flink-ml/pull/247


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] lindong28 commented on pull request #247: [FLINK-32335] Fix the ConcurrentModificationException in the unittest

2023-07-13 Thread via GitHub


lindong28 commented on PR #247:
URL: https://github.com/apache/flink-ml/pull/247#issuecomment-1635252080

   Thanks for the PR. LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32590) Fail to read flink parquet filesystem table stored in hive metastore service.

2023-07-13 Thread Guozhen Yang (Jira)
Guozhen Yang created FLINK-32590:


 Summary: Fail to read flink parquet filesystem table stored in 
hive metastore service.
 Key: FLINK-32590
 URL: https://issues.apache.org/jira/browse/FLINK-32590
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive, Formats (JSON, Avro, Parquet, ORC, 
SequenceFile)
Affects Versions: 1.17.1
Reporter: Guozhen Yang


h2. Summary:

Fail to read flink parquet filesystem table stored in hive metastore service.
h2. The problem:

When I try to read a flink parquet filesystem table stored in hive metastore 
service, I got the following exception.
{noformat}
java.lang.RuntimeException: One or more fetchers have encountered exception
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261)
 ~[flink-connector-files-1.17.1.jar:1.17.1]
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
 ~[flink-connector-files-1.17.1.jar:1.17.1]
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)
 ~[flink-connector-files-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) 
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
~[flink-dist-1.17.1.jar:1.17.1]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_345]
Caused by: java.lang.NoSuchMethodError: 
shaded.parquet.org.apache.thrift.TBaseHelper.hashCode(J)I
at org.apache.parquet.format.ColumnChunk.hashCode(ColumnChunk.java:812) 
~[flink-sql-parquet-1.17.1.jar:1.17.1]
at java.util.AbstractList.hashCode(AbstractList.java:541) ~[?:1.8.0_345]
at org.apache.parquet.format.RowGroup.hashCode(RowGroup.java:704) 
~[flink-sql-parquet-1.17.1.jar:1.17.1]
at java.util.HashMap.hash(HashMap.java:340) ~[?:1.8.0_345]
at java.util.HashMap.put(HashMap.java:613) ~[?:1.8.0_345]
at 
org.apache.parquet.format.converter.ParquetMetadataConverter.generateRowGroupOffsets(ParquetMetadataConverter.java:1411)
 ~[flink-sql-parquet-1.17.1.jar:1.17.1]
at 
org.apache.parquet.format.converter.ParquetMetadataConverter.access$600(ParquetMetadataConverter.java:144)
 ~[flink-sql-parquet-1.17.1.jar:1.17.1]
at 
org.apache.parquet.format.converter.ParquetMetadataConverter$3.visit(ParquetMetadataConverter.java:1461)
 ~[flink-sql-parquet-1.17.1.jar:1.17.1]
at 
org.apache.parquet.format.converter.ParquetMetadataConverter$3.visit(ParquetMetadataConverter.java:1437)
 ~[flink-sql-parquet-1.17.1.jar:1.17.1]
at 
org.apache.parquet.format.converter.ParquetMetadataConverter$RangeMetadataFilter.accept(ParquetMetadataConverter.java:1207)
 ~[flink-sql-parquet-1.17.1.jar:1.17.1]
at 
org.apache.parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:1437)
 ~[flink-sql-parquet-1.17.1.jar:1.17.1]
at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:583)
 ~[flink-sql-parquet-1.17.1.jar:1.17.1]
at 
org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:777) 
~[flink-sql-parquet-1.17.1.jar:1.17.1]
at 
org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:658) 
~[flink-sql-parquet-1.17.1.jar:1.17.1]
at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:127)
 ~[flink-sql-parquet-1.17.1.jar:1.17.1]
at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat

[GitHub] [flink] flinkbot commented on pull request #22994: [FLINK-32354][table] Supports executing call procedure statement

2023-07-13 Thread via GitHub


flinkbot commented on PR #22994:
URL: https://github.com/apache/flink/pull/22994#issuecomment-1635219756

   
   ## CI report:
   
   * c7abe7f02a916120208e6f9b80513bf6b6385b9a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32354) Support to execute the call procedure operation

2023-07-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32354:
---
Labels: pull-request-available  (was: )

> Support to execute the call procedure operation
> ---
>
> Key: FLINK-32354
> URL: https://issues.apache.org/jira/browse/FLINK-32354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] luoyuxia opened a new pull request, #22994: [FLINK-32354][table] Supports executing call procedure statement

2023-07-13 Thread via GitHub


luoyuxia opened a new pull request, #22994:
URL: https://github.com/apache/flink/pull/22994

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation

2023-07-13 Thread via GitHub


Samrat002 commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1263236647


##
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.glue.constants.AWSGlueConfigConstants;
+import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** Collection of {@link ConfigOption} used in GlueCatalog. */
+@Internal
+public class GlueCatalogOptions extends CommonCatalogOptions {
+
+public static final String IDENTIFIER = "glue";
+public static final ConfigOption DEFAULT_DATABASE =
+ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
+.stringType()
+.defaultValue(GenericInMemoryCatalog.DEFAULT_DB);
+
+public static final ConfigOption INPUT_FORMAT =
+ConfigOptions.key(GlueCatalogConstants.TABLE_INPUT_FORMAT)
+.stringType()
+.noDefaultValue();
+
+public static final ConfigOption OUTPUT_FORMAT =
+ConfigOptions.key(GlueCatalogConstants.TABLE_OUTPUT_FORMAT)
+.stringType()
+.noDefaultValue();
+
+public static final ConfigOption GLUE_CATALOG_ENDPOINT =
+ConfigOptions.key(AWSGlueConfigConstants.GLUE_CATALOG_ENDPOINT)
+.stringType()
+.noDefaultValue();

Review Comment:
   explained here 
[link](https://github.com/apache/flink-connector-aws/pull/47#discussion_r1263235270)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation

2023-07-13 Thread via GitHub


Samrat002 commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1263235270


##
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.glue.constants.AWSGlueConfigConstants;
+import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** Collection of {@link ConfigOption} used in GlueCatalog. */
+@Internal
+public class GlueCatalogOptions extends CommonCatalogOptions {
+
+public static final String IDENTIFIER = "glue";
+public static final ConfigOption DEFAULT_DATABASE =
+ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
+.stringType()
+.defaultValue(GenericInMemoryCatalog.DEFAULT_DB);
+
+public static final ConfigOption INPUT_FORMAT =
+ConfigOptions.key(GlueCatalogConstants.TABLE_INPUT_FORMAT)
+.stringType()
+.noDefaultValue();
+
+public static final ConfigOption OUTPUT_FORMAT =
+ConfigOptions.key(GlueCatalogConstants.TABLE_OUTPUT_FORMAT)
+.stringType()
+.noDefaultValue();
+
+public static final ConfigOption GLUE_CATALOG_ENDPOINT =
+ConfigOptions.key(AWSGlueConfigConstants.GLUE_CATALOG_ENDPOINT)
+.stringType()
+.noDefaultValue();
+
+public static final ConfigOption GLUE_CATALOG_ID =
+
ConfigOptions.key(AWSGlueConfigConstants.GLUE_CATALOG_ID).stringType().noDefaultValue();
+
+public static final ConfigOption GLUE_ACCOUNT_ID =
+
ConfigOptions.key(AWSGlueConfigConstants.GLUE_ACCOUNT_ID).stringType().noDefaultValue();

Review Comment:
   In `GlueCatalogOptions` class `GLUE_ACCOUNT_ID` is optional config which 
user can pass to connect to different glue account in combination with 
`GLUE_CATALOG_ENDPOINT` . 
   One of the use case where user can create 2 catalog in same session. one 
catalog can points to glue account (may be in different aws account) and 
another to different glue account. 
   
   Is its used in method 
   
   ```
   public static Set> getAllConfigOptions() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation

2023-07-13 Thread via GitHub


Samrat002 commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1263235270


##
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.glue.constants.AWSGlueConfigConstants;
+import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** Collection of {@link ConfigOption} used in GlueCatalog. */
+@Internal
+public class GlueCatalogOptions extends CommonCatalogOptions {
+
+public static final String IDENTIFIER = "glue";
+public static final ConfigOption DEFAULT_DATABASE =
+ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
+.stringType()
+.defaultValue(GenericInMemoryCatalog.DEFAULT_DB);
+
+public static final ConfigOption INPUT_FORMAT =
+ConfigOptions.key(GlueCatalogConstants.TABLE_INPUT_FORMAT)
+.stringType()
+.noDefaultValue();
+
+public static final ConfigOption OUTPUT_FORMAT =
+ConfigOptions.key(GlueCatalogConstants.TABLE_OUTPUT_FORMAT)
+.stringType()
+.noDefaultValue();
+
+public static final ConfigOption GLUE_CATALOG_ENDPOINT =
+ConfigOptions.key(AWSGlueConfigConstants.GLUE_CATALOG_ENDPOINT)
+.stringType()
+.noDefaultValue();
+
+public static final ConfigOption GLUE_CATALOG_ID =
+
ConfigOptions.key(AWSGlueConfigConstants.GLUE_CATALOG_ID).stringType().noDefaultValue();
+
+public static final ConfigOption GLUE_ACCOUNT_ID =
+
ConfigOptions.key(AWSGlueConfigConstants.GLUE_ACCOUNT_ID).stringType().noDefaultValue();

Review Comment:
   In `GlueCatalogOptions` class `GLUE_ACCOUNT_ID` is optional config which 
user can pass to connect to different glue account. 
   One of the use case where user can create 2 catalog in same session. one 
catalog can points to glue account (may be in different aws account) and 
another to different glue account. 
   
   Is its used in method 
   
   ```
   public static Set> getAllConfigOptions() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation

2023-07-13 Thread via GitHub


Samrat002 commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1263235270


##
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.glue.constants.AWSGlueConfigConstants;
+import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** Collection of {@link ConfigOption} used in GlueCatalog. */
+@Internal
+public class GlueCatalogOptions extends CommonCatalogOptions {
+
+public static final String IDENTIFIER = "glue";
+public static final ConfigOption DEFAULT_DATABASE =
+ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
+.stringType()
+.defaultValue(GenericInMemoryCatalog.DEFAULT_DB);
+
+public static final ConfigOption INPUT_FORMAT =
+ConfigOptions.key(GlueCatalogConstants.TABLE_INPUT_FORMAT)
+.stringType()
+.noDefaultValue();
+
+public static final ConfigOption OUTPUT_FORMAT =
+ConfigOptions.key(GlueCatalogConstants.TABLE_OUTPUT_FORMAT)
+.stringType()
+.noDefaultValue();
+
+public static final ConfigOption GLUE_CATALOG_ENDPOINT =
+ConfigOptions.key(AWSGlueConfigConstants.GLUE_CATALOG_ENDPOINT)
+.stringType()
+.noDefaultValue();
+
+public static final ConfigOption GLUE_CATALOG_ID =
+
ConfigOptions.key(AWSGlueConfigConstants.GLUE_CATALOG_ID).stringType().noDefaultValue();
+
+public static final ConfigOption GLUE_ACCOUNT_ID =
+
ConfigOptions.key(AWSGlueConfigConstants.GLUE_ACCOUNT_ID).stringType().noDefaultValue();

Review Comment:
   In `GlueCatalogOptions` class `GLUE_ACCOUNT_ID` is optional config which 
user can pass to connect to different glue account. 
   One of the use case where user can create 2 catalog in same session. one 
catalog can points to glue account (may be in different aws account) and 
another to different glue account. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation

2023-07-13 Thread via GitHub


Samrat002 commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1263233465


##
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/constants/AWSGlueConfigConstants.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.constants;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Configuration keys for AWS Glue Data Catalog service usage. */
+@PublicEvolving
+public class AWSGlueConfigConstants {
+
+/**
+ * Configure an alternative endpoint of the Glue service for GlueCatalog 
to access.
+ *
+ * This could be used to use GlueCatalog with any glue-compatible 
metastore service that has
+ * a different endpoint
+ */
+public static final String GLUE_CATALOG_ENDPOINT = "aws.glue.endpoint";

Review Comment:
   this constant is used in defining config options for the catalog 
   
   ```
   public static final ConfigOption GLUE_CATALOG_ENDPOINT =
   ConfigOptions.key(AWSGlueConfigConstants.GLUE_CATALOG_ENDPOINT)
   .stringType()
   .noDefaultValue();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation

2023-07-13 Thread via GitHub


Samrat002 commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1263232863


##
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/constants/AWSGlueConfigConstants.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.constants;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Configuration keys for AWS Glue Data Catalog service usage. */
+@PublicEvolving
+public class AWSGlueConfigConstants {
+
+/**
+ * Configure an alternative endpoint of the Glue service for GlueCatalog 
to access.
+ *
+ * This could be used to use GlueCatalog with any glue-compatible 
metastore service that has
+ * a different endpoint
+ */
+public static final String GLUE_CATALOG_ENDPOINT = "aws.glue.endpoint";
+
+/**
+ * The ID of the Glue Data Catalog where the tables reside. If none is 
provided, Glue
+ * automatically uses the caller's AWS account ID by default.
+ *
+ * For more details, see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html";>...
+ */
+public static final String GLUE_CATALOG_ID = "aws.glue.id";
+
+/**
+ * The account ID used in a Glue resource ARN, e.g.
+ * arn:aws:glue:us-east-1:1:table/db1/table1
+ */
+public static final String GLUE_ACCOUNT_ID = "aws.glue.account-id";

Review Comment:
   
   ```
   public static final ConfigOption GLUE_ACCOUNT_ID =
   
ConfigOptions.key(AWSGlueConfigConstants.GLUE_ACCOUNT_ID).stringType().noDefaultValue();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation

2023-07-13 Thread via GitHub


Samrat002 commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1263232863


##
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/constants/AWSGlueConfigConstants.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.constants;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Configuration keys for AWS Glue Data Catalog service usage. */
+@PublicEvolving
+public class AWSGlueConfigConstants {
+
+/**
+ * Configure an alternative endpoint of the Glue service for GlueCatalog 
to access.
+ *
+ * This could be used to use GlueCatalog with any glue-compatible 
metastore service that has
+ * a different endpoint
+ */
+public static final String GLUE_CATALOG_ENDPOINT = "aws.glue.endpoint";
+
+/**
+ * The ID of the Glue Data Catalog where the tables reside. If none is 
provided, Glue
+ * automatically uses the caller's AWS account ID by default.
+ *
+ * For more details, see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html";>...
+ */
+public static final String GLUE_CATALOG_ID = "aws.glue.id";
+
+/**
+ * The account ID used in a Glue resource ARN, e.g.
+ * arn:aws:glue:us-east-1:1:table/db1/table1
+ */
+public static final String GLUE_ACCOUNT_ID = "aws.glue.account-id";

Review Comment:
   It is used in 
   ```
   public static final ConfigOption GLUE_ACCOUNT_ID =
   
ConfigOptions.key(AWSGlueConfigConstants.GLUE_ACCOUNT_ID).stringType().noDefaultValue();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-28743) Support validating the determinism for StreamPhysicalMatchRecognize

2023-07-13 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee resolved FLINK-28743.
-
Resolution: Fixed

fixed in master: 8829b5e60a71b871462d1d2bb849c926b0de9b80

> Support validating the determinism for StreamPhysicalMatchRecognize
> ---
>
> Key: FLINK-28743
> URL: https://issues.apache.org/jira/browse/FLINK-28743
> Project: Flink
>  Issue Type: Sub-task
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> MatchRecognize has complex expressions and is not commonly used in 
> traditional SQLs, so mark this as a minor issue (for 1.16)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] lincoln-lil merged pull request #22981: [FLINK-28743][table-planner] Supports validating the determinism for StreamPhysicalMatchRecognize

2023-07-13 Thread via GitHub


lincoln-lil merged PR #22981:
URL: https://github.com/apache/flink/pull/22981


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] jiaoqingbo commented on pull request #22992: [hotfix] Fix typo in JobManagerOptions

2023-07-13 Thread via GitHub


jiaoqingbo commented on PR #22992:
URL: https://github.com/apache/flink/pull/22992#issuecomment-1635163141

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-32481) Support type inference for procedure

2023-07-13 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia resolved FLINK-32481.
--
Resolution: Fixed

master:

e5324c085f627df2f8e452b0aec3264fe0c6f6f6

> Support type inference for procedure
> 
>
> Key: FLINK-32481
> URL: https://issues.apache.org/jira/browse/FLINK-32481
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Runtime
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
>  Labels: pull-request-available
>
> Currently, FunctionMappingExtractor can only handle the type inference for 
> procedure. We can extend it to make it can also handle procedure. Since 
> procedure is much similar to function, we can resue the stack/code of 
> {{{}FunctionMappingExtractor{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] luoyuxia merged pull request #22904: [FLINK-32481][table] Support type inference for procedure

2023-07-13 Thread via GitHub


luoyuxia merged PR #22904:
URL: https://github.com/apache/flink/pull/22904


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on pull request #22904: [FLINK-32481][table] Support type inference for procedure

2023-07-13 Thread via GitHub


luoyuxia commented on PR #22904:
URL: https://github.com/apache/flink/pull/22904#issuecomment-1635130068

   @LadyForest Thanks for reviewing. Merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32587) The results returned from the CDC sql query are null or the value was changed unexpectly

2023-07-13 Thread jasonliangyc (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jasonliangyc closed FLINK-32587.

Resolution: Cannot Reproduce

> The results returned from the CDC sql query are null or the value was changed 
> unexpectly
> 
>
> Key: FLINK-32587
> URL: https://issues.apache.org/jira/browse/FLINK-32587
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0, 1.17.1
>Reporter: jasonliangyc
>Priority: Critical
> Attachments: image-2023-07-13-17-35-32-235.png, 
> image-2023-07-13-17-37-56-908.png
>
>
> I created a CDC table as below and then run the query 'select * from so_cdc' 
> through sql-client, it gives me the unexpected results.
> {code:java}
> CREATE TABLE so_cdc (
>    REC_ID STRING,
>    Create_Date TIMESTAMP(3),
>    PRIMARY KEY (REC_ID) NOT ENFORCED
>  ) WITH (
>     'connector' = 'sqlserver-cdc',
>     'hostname' = '',
>     'port' = '',
>     'username' = 'xxx',
>     'password' = '',
>     'database-name' = '',
>     'schema-name' = '',
>     'table-name' = 'xxx',
>     'scan.startup.mode' = 'latest-offset'
>  ); {code}
> Run the query for the first time, the data look normal.
> !image-2023-07-13-17-35-32-235.png|width=535,height=141!
>  
> But after i run the same query multiple times, it gives me the unexpected 
> data, and i'm sure that these two columns of my cdc source table don't 
> contain these data
> !image-2023-07-13-17-37-56-908.png|width=469,height=175!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31830) Coalesce on nested fields with different nullabilities will get wrong plan

2023-07-13 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742968#comment-17742968
 ] 

Sergey Nuyanzin commented on FLINK-31830:
-

Thanks for the very detailed investigation
The main issue could be here is connector's support which are testing against 
master and 2 previous major releases...
Even if such changes are applied to master connectors using this types can not 
benefit from it for a long period of time since they still to support old 
releases.
I wonder if we can keep current method/constructor signature and e.g. tweak 
{{org.apache.flink.table.api.DataTypes#ROW(org.apache.flink.table.api.DataTypes.Field...)}}
 in a way that if at least any fields is not null then set initial nullability 
of the record to {{false}}... Would it work in such a way?



> Coalesce on nested fields with different nullabilities will get wrong plan
> --
>
> Key: FLINK-31830
> URL: https://issues.apache.org/jira/browse/FLINK-31830
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.6
>Reporter: lincoln lee
>Assignee: Jane Chan
>Priority: Major
> Attachments: image-2023-06-09-15-06-01-322.png, 
> image-2023-06-09-15-21-13-720.png
>
>
> A test case similar to FLINK-31829, only changes the nullable field `a.np` to 
> not null, will get a wrong plan in 1.14.x (reported from the community user):
> {code}
>   @Test
>   def testCoalesceOnNestedColumns(): Unit = {
> val tEnv = util.tableEnv
> val tableDescriptor = TableDescriptor.forConnector("datagen")
> .schema(Schema.newBuilder
> .column("id", DataTypes.INT.notNull)
> .column("a", DataTypes.ROW(DataTypes.FIELD("np", 
> DataTypes.INT.notNull())).nullable)
> .build)
> .build
> tEnv.createTemporaryTable("t1", tableDescriptor)
> tEnv.createTemporaryTable("t2", tableDescriptor)
> val res = tEnv.executeSql("EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) 
> c1, IFNULL(a.a.np, b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a 
> is null or a.a.np is null")
> res.print()
> }  
> == Abstract Syntax Tree ==
> LogicalProject(id=[$0], c1=[CAST($1.np):INTEGER], c2=[IFNULL($1.np, $3.np)])
> +- LogicalFilter(condition=[OR(IS NULL($1), IS NULL(CAST($1.np):INTEGER))])
>+- LogicalJoin(condition=[=($0, $2)], joinType=[left])
>   :- LogicalTableScan(table=[[default_catalog, default_database, t1]])
>   +- LogicalTableScan(table=[[default_catalog, default_database, t2]])
> {code}
> the top project in the ast is wrong:  `LogicalProject(id=[$0], 
> c1=[CAST($1.np):INTEGER], c2=[IFNULL($1.np, $3.np)])`, the 
> `c1=[CAST($1.np):INTEGER]` relate to `COALESCE(a.a.np, b.a.np) c1` is 
> incorrect,
> but this works fine when using sql ddl to create tables
> {code}
>   @Test
>   def testCoalesceOnNestedColumns2(): Unit = {
> val tEnv = util.tableEnv
> tEnv.executeSql(
>   s"""
>  |create temporary table t1 (
>  |  id int not null,
>  |  a row
>  |) with (
>  | 'connector' = 'datagen'
>  |)
>  |""".stripMargin)
> tEnv.executeSql(
>   s"""
>  |create temporary table t2 (
>  |  id int not null,
>  |  a row
>  |) with (
>  | 'connector' = 'datagen'
>  |)
>  |""".stripMargin)
> val res = tEnv.executeSql(
>   "EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) c1, IFNULL(a.a.np, 
> b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a is null or a.a.np 
> is null")
> res.print()
>   }
> {code}
> from 1.15, the coalesce will be a new builtin function, and the ast looks 
> correct in version 1.15+, while before 1.15 it was rewritten as `case when`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32585) Filter javax.xml.bind:jaxb-api false positive for Pulsar connector

2023-07-13 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen closed FLINK-32585.
-
Resolution: Fixed

master via d7a3b3847dc5c680b32d1997d448b7dac44e529c

> Filter javax.xml.bind:jaxb-api false positive for Pulsar connector
> --
>
> Key: FLINK-32585
> URL: https://issues.apache.org/jira/browse/FLINK-32585
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System / CI
>Reporter: Zili Chen
>Assignee: Zili Chen
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] tisonkun merged pull request #22989: [FLINK-32585] Filter javax.xml.bind:jaxb-api false positive for Pulsar connector

2023-07-13 Thread via GitHub


tisonkun merged PR #22989:
URL: https://github.com/apache/flink/pull/22989


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] tisonkun commented on pull request #22989: [FLINK-32585] Filter javax.xml.bind:jaxb-api false positive for Pulsar connector

2023-07-13 Thread via GitHub


tisonkun commented on PR #22989:
URL: https://github.com/apache/flink/pull/22989#issuecomment-1634654818

   Emm..I think I should merge it now so that the SNAPSHOT can be updated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] tisonkun commented on pull request #22989: [FLINK-32585] Filter javax.xml.bind:jaxb-api false positive for Pulsar connector

2023-07-13 Thread via GitHub


tisonkun commented on PR #22989:
URL: https://github.com/apache/flink/pull/22989#issuecomment-1634653552

   I'm going to merge this patch tomorrow and check if the Pulsar connector can 
pass with this patch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22993: Bump okio from 1.17.2 to 3.4.0 in /flink-end-to-end-tests/flink-end-to-end-tests-sql

2023-07-13 Thread via GitHub


flinkbot commented on PR #22993:
URL: https://github.com/apache/flink/pull/22993#issuecomment-1634603561

   
   ## CI report:
   
   * c1ebb259b21f391fb12eae4d1dd2647e1dd90350 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dependabot[bot] opened a new pull request, #22993: Bump okio from 1.17.2 to 3.4.0 in /flink-end-to-end-tests/flink-end-to-end-tests-sql

2023-07-13 Thread via GitHub


dependabot[bot] opened a new pull request, #22993:
URL: https://github.com/apache/flink/pull/22993

   Bumps [okio](https://github.com/square/okio) from 1.17.2 to 3.4.0.
   
   Changelog
   Sourced from https://github.com/square/okio/blob/master/CHANGELOG.md";>okio's 
changelog.
   
   Version 3.4.0
   2023-07-07
   
   New: Adapt a Java NIO FileSystem (java.nio.file.FileSystem) 
as an Okio FileSystem using
   fileSystem.asOkioFileSystem().
   New: Adapt Android’s AssetManager as an Okio FileSystem 
using AssetFileSystem. This is in the
   new okio-assetfilesystem module. Android applications should 
prefer this over
   FileSystem.RESOURCES as it’s faster to load.
   Fix: Don't crash decoding GZIP files when the optional extra data 
(XLEN) is 32 KiB or larger.
   Fix: Resolve symlinks in FakeFileSystem.canonicalize().
   Fix: Report the correct createdAtMillis in 
NodeJsFileSystem file metadata. We were
   incorrectly using ctimeMs, where c means 
changed, not created.
   Fix: UnsafeCursor is now Closeable.
   
   Version 3.3.0
   2023-01-07
   
   Fix: Don't leak resources when use {} is used with a 
non-local return. We introduced this
   performance and stability bug by not considering that non-local returns 
execute neither the
   return nor catch control flows.
   Fix: Use a sealed interface for BufferedSink and 
BufferedSource. These were never intended
   for end-users to implement, and we're happy that Kotlin now allows us to 
express that in our API.
   New: Change internal locks from synchronized to 
ReentrantLock and Condition. We expect this
   to improve help when using Okio with Java virtual threads ([Project 
Loom][loom]).
   Upgrade: [Kotlin 1.8.0][kotlin_1_8_0].
   
   Version 3.2.0
   2022-06-26
   
   Fix: Configure the multiplatform artifact 
(com.squareup.okio:okio:3.x.x) to depend on the
   JVM artifact (com.squareup.okio:okio-jvm:3.x.x) for Maven 
builds. This should work-around an
   issue where Maven doesn't interpret Gradle metadata.
   Fix: Change CipherSource and CipherSink to 
recover if the cipher doesn't support streaming.
   This should work around a crash with AES/GCM ciphers on Android.
   New: Enable compatibility with non-hierarchical projects.
   
   Version 3.1.0
   2022-04-19
   
   Upgrade: [Kotlin 1.6.20][kotlin_1_6_20].
   New: Support [Hierarchical project structure][hierarchical_projects]. If 
you're using Okio in a
   multiplatform project please upgrade your project to Kotlin 1.6.20 (or 
newer) to take advantage
   of this. With hierarchical projects it's easier to use properties like 
FileSystem.SYSTEM that
   
   
   
   ... (truncated)
   
   
   Commits
   
   https://github.com/square/okio/commit/a161b07fb1b459371458ae6d9508ec31df280428";>a161b07
 Prepare for release 3.4.0.
   https://github.com/square/okio/commit/c5f462b0b51979f0b23b08bff123011bb01045ea";>c5f462b
 Copyright to files in build-support (https://redirect.github.com/square/okio/issues/1285";>#1285)
   https://github.com/square/okio/commit/f21714d492f054ae689b455284816721498775eb";>f21714d
 Upgrade Gradle and JMH (https://redirect.github.com/square/okio/issues/1283";>#1283)
   https://github.com/square/okio/commit/5f5db4a0d2b1a3a0147c6bc18aeaba5a4ffa4037";>5f5db4a
 Merge pull request https://redirect.github.com/square/okio/issues/1284";>#1284 from 
square/renovate/com.google.jimfs
   https://github.com/square/okio/commit/8af8d2a87b0c71ced5d16c44daef20ab0c5d48c8";>8af8d2a
 Update dependency com.google.jimfs:jimfs to v1.3.0
   https://github.com/square/okio/commit/b64c198b790804eea26a05f5409bffb1a4a2d8eb";>b64c198
 Update dependency com.vanniktech:gradle-maven-publish-plugin to v0.25.3 (https://redirect.github.com/square/okio/issues/1282";>#1282)
   https://github.com/square/okio/commit/ea827139afef064ddd0078607719d32d7c154c0f";>ea82713
 Merge pull request https://redirect.github.com/square/okio/issues/1281";>#1281 from 
square/renovate/gradle-7.x
   https://github.com/square/okio/commit/3569daa8b8d039a8989440abcc970b7f35171d49";>3569daa
 Update dependency gradle to v7.6.2
   https://github.com/square/okio/commit/e937a50ffc482f9777b639d3399ba331b167107a";>e937a50
 Merge pull request https://redirect.github.com/square/okio/issues/1277";>#1277 from 
sifmelcara/fix-int-sign-conversion
   https://github.com/square/okio/commit/81bce1a30af244550b0324597720e4799281da7b";>81bce1a
 Fix a bug where xlen larger than 0x7fff was rejected (https://redirect.github.com/square/okio/issues/1280";>#1280)
   Additional commits viewable in https://github.com/square/okio/compare/okio-parent-1.17.2...parent-3.4.0";>compare
 view
   
   
   
   
   
   [![Dependabot compatibility 
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=com.squareup.okio:okio&package-manager=maven&previous-version=1.17.2&new-version=3.4.0)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)
   
   Dependabot will resolve any

[jira] [Commented] (FLINK-32589) Carry over parallelism overrides to prevent users from clearing them on updates

2023-07-13 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742887#comment-17742887
 ] 

Gyula Fora commented on FLINK-32589:


I think this would be a good improvement but we have to consider and document 
how user specified overrides will interact with this, how the user can override 
the autoscaler set parallelisms, or completely clear them.

> Carry over parallelism overrides to prevent users from clearing them on 
> updates
> ---
>
> Key: FLINK-32589
> URL: https://issues.apache.org/jira/browse/FLINK-32589
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Priority: Major
>
> The autoscaler currently sets the parallelism overrides via the Flink config 
> {{pipeline.jobvertex-parallelism-overrides}}. Whenever the user posts specs 
> updates, special care needs to be taken in order to carry over existing 
> overrides. Otherwise the job will reset to the default parallelism 
> configuration. Users shouldn't have to deal with this. Instead, whenever a 
> new spec is posted which does not contain the overrides, the operator should 
> automatically apply the last-used overrides (if autoscaling is enabled).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32589) Carry over parallelism overrides to prevent users from clearing them on updates

2023-07-13 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742886#comment-17742886
 ] 

Gyula Fora commented on FLINK-32589:


So just to be clear, do you suggest adding a new status field with the 
autoscaler overrides and always applying them to the spec?

This way we could actually get rid of any spec modification done by the 
autoscaler module and let the reconciler simply apply it without updating it in 
k8s.

> Carry over parallelism overrides to prevent users from clearing them on 
> updates
> ---
>
> Key: FLINK-32589
> URL: https://issues.apache.org/jira/browse/FLINK-32589
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Priority: Major
>
> The autoscaler currently sets the parallelism overrides via the Flink config 
> {{pipeline.jobvertex-parallelism-overrides}}. Whenever the user posts specs 
> updates, special care needs to be taken in order to carry over existing 
> overrides. Otherwise the job will reset to the default parallelism 
> configuration. Users shouldn't have to deal with this. Instead, whenever a 
> new spec is posted which does not contain the overrides, the operator should 
> automatically apply the last-used overrides (if autoscaling is enabled).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32420) Watermark aggregation performance is poor when watermark alignment is enabled and parallelism is high

2023-07-13 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-32420:
---
Release Note: 
This performance improvement would be good to mention in the release blog post. 

As proven by the micro benchmarks (screenshots attached in the ticket), with 
5000 subtasks, the time to calculate the watermark alignment on the JobManager 
by a factor of 76x (7664%). Previously such large jobs where actually at large 
risk of overloading JobManager, now that's far less likely to happen.

> Watermark aggregation performance is poor when watermark alignment is enabled 
> and parallelism is high
> -
>
> Key: FLINK-32420
> URL: https://issues.apache.org/jira/browse/FLINK-32420
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Affects Versions: 1.17.1
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
> Attachments: Screenshot 2023-07-13 at 17.19.11.png, Screenshot 
> 2023-07-13 at 17.19.24.png
>
>
> The 
> [SourceCoordinator.WatermarkAggregator#aggregate|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L644]
>  method will find the smallest watermark of all keys as the  
> aggregatedWatermark.
> However, the time complexity of the aggregate method in a WatermarkAlignment 
> updateInterval cycle is O(n*n),because:
>  * Every subtask report a latest watermark to SourceCoordinator in a 
> WatermarkAlignment updateInterval cycle
>  * SourceCoordinator updates the smallest watermark from all subtasks for 
> each reporting
> In general, the key is subtaskIndex, so the number of key is parallelism. 
> When the parallelism is high, the watermark aggregation performance  will be 
> poor.
> h1. Performance Test:
> The parallelism is 1, each subtask reports 20 watermarks, and the 
> aggregate method takes 18.921s. Almost every round takes 950 ms.
>  * If the watermarkAlignment updateInterval is 1s, SourceCoordinator will be 
> very busy.
>  * If it's less than 1s, the Watermark aggregation will be delayed
> I have finished the POC for performance improvement, and reduced Watermark 
> aggregation time per watermarkAlignment updateInterval cycle from 950 ms to 6 
> ms.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32589) Carry over parallelism overrides to prevent users from clearing them on updates

2023-07-13 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742865#comment-17742865
 ] 

Maximilian Michels commented on FLINK-32589:


We handle this in our application stack but I think this is something that the 
operator should do for the user. Most users probably do a POST/PUT on the spec 
which will clear any overrides. This will come as a surprise to users.

It is really an implementation detail that the overrides are handled via an 
undocumented Flink configuration option. The overrides are more part of the 
applications status than actual configuration.

Users can turn off autoscaling which should clear any overrides.

> Carry over parallelism overrides to prevent users from clearing them on 
> updates
> ---
>
> Key: FLINK-32589
> URL: https://issues.apache.org/jira/browse/FLINK-32589
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Priority: Major
>
> The autoscaler currently sets the parallelism overrides via the Flink config 
> {{pipeline.jobvertex-parallelism-overrides}}. Whenever the user posts specs 
> updates, special care needs to be taken in order to carry over existing 
> overrides. Otherwise the job will reset to the default parallelism 
> configuration. Users shouldn't have to deal with this. Instead, whenever a 
> new spec is posted which does not contain the overrides, the operator should 
> automatically apply the last-used overrides (if autoscaling is enabled).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] XComp commented on a diff in pull request #22987: [FLINK-32583][rest] Fix deadlock in RestClient

2023-07-13 Thread via GitHub


XComp commented on code in PR #22987:
URL: https://github.com/apache/flink/pull/22987#discussion_r1262773979


##
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java:
##
@@ -501,6 +501,22 @@ private  CompletableFuture 
submitRequest(
 }
 });
 
+// [FLINK-32583] If connectFuture failed instantly but channelFuture 
is unresolved, it may
+// mean the executor service Netty is using has shut down, in which 
case the above listener
+// to complete channelFuture will never run
+if (connectFuture.isDone() && !connectFuture.isSuccess() && 
!channelFuture.isDone()) {

Review Comment:
   Should we add a comment why we're not handling the success case? 
Essentially, we're trying to work around a bug in the netty code, aren't we?
   ```
   /**
* Adds the specified listener to this future.  The
* specified listener is notified when this future is
* {@linkplain #isDone() done}.  If this future is already
* completed, the specified listener is notified immediately.
*/
   ```
   The JavaDoc of `Future.addListener` states that the listener would be 
informed if the future is already completed (which includes the successful and 
the exceptional case). But that doesn't match the implementation, apparently.
   
   Right now, we're missing the `isCancelled` and `isSuccess` case handling. 
It's not clear whether these cases can actually happen. But since we're (at 
least) covering the success case in the listener implementation above, wouldn't 
it be reasonable to cover it here as well?



##
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java:
##
@@ -207,6 +209,42 @@ public void testRestClientClosedHandling() throws 
Exception {
 }
 }
 
+/**
+ * Tests that the futures returned by {@link RestClient} fail immediately 
if the client is
+ * already closed.
+ *
+ * See FLINK-32583
+ */
+@Test
+public void testCloseClientBeforeRequest() throws Exception {
+// Note that the executor passed to the RestClient constructor is not 
the same as the
+// executor used by Netty
+try (final RestClient restClient =
+new RestClient(new Configuration(), 
Executors.directExecutor())) {
+// Intentionally close the client (and thus also the executor used 
by Netty)
+restClient.close();
+
+CompletableFuture future =
+restClient.sendRequest(
+unroutableIp,
+80,
+new TestMessageHeaders(),
+EmptyMessageParameters.getInstance(),
+EmptyRequestBody.getInstance());
+
+// Call get() on the future with a timeout of 0 so we can test 
that the exception thrown
+// is not a TimeoutException, which is what would be thrown if 
restClient were not
+// already closed
+final ThrowingRunnable getFuture = () -> future.get(0, 
TimeUnit.SECONDS);
+
+final ExecutionException executionException =
+assertThrows(ExecutionException.class, getFuture);
+final Throwable throwable = 
ExceptionUtils.stripExecutionException(executionException);
+assertThat(throwable, instanceOf(IOException.class));
+assertThat(throwable.getMessage(), containsString("RestClient is 
closed"));

Review Comment:
   ```suggestion
   final Throwable cause = assertThrows(ExecutionException.class, 
getFuture).getCause();
   assertThat(cause, instanceOf(IOException.class));
   assertThat(cause.getMessage(), containsString("RestClient is 
closed"));
   ```
   nit: What about extracting the cause right away and doing the assertions on 
the cause? That makes the test more restrictive. We shouldn't expect a 
multiple-layer stacktrace here, should we?



##
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java:
##
@@ -501,6 +501,22 @@ private  CompletableFuture 
submitRequest(
 }
 });
 
+// [FLINK-32583] If connectFuture failed instantly but channelFuture 
is unresolved, it may
+// mean the executor service Netty is using has shut down, in which 
case the above listener
+// to complete channelFuture will never run
+if (connectFuture.isDone() && !connectFuture.isSuccess() && 
!channelFuture.isDone()) {
+final String message;
+if (!isRunning.get()) {

Review Comment:
   nit: Could we invert the if condition? ...just for readability purposes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For que

[GitHub] [flink-kubernetes-operator] gyfora merged pull request #630: [hotfix] Fix in-place rescaling for removed overrides

2023-07-13 Thread via GitHub


gyfora merged PR #630:
URL: https://github.com/apache/flink-kubernetes-operator/pull/630


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32420) Watermark aggregation performance is poor when watermark alignment is enabled and parallelism is high

2023-07-13 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742849#comment-17742849
 ] 

Rui Fan commented on FLINK-32420:
-

Thanks [~pnowojski] for update the performance change here

> Watermark aggregation performance is poor when watermark alignment is enabled 
> and parallelism is high
> -
>
> Key: FLINK-32420
> URL: https://issues.apache.org/jira/browse/FLINK-32420
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Affects Versions: 1.17.1
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
> Attachments: Screenshot 2023-07-13 at 17.19.11.png, Screenshot 
> 2023-07-13 at 17.19.24.png
>
>
> The 
> [SourceCoordinator.WatermarkAggregator#aggregate|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L644]
>  method will find the smallest watermark of all keys as the  
> aggregatedWatermark.
> However, the time complexity of the aggregate method in a WatermarkAlignment 
> updateInterval cycle is O(n*n),because:
>  * Every subtask report a latest watermark to SourceCoordinator in a 
> WatermarkAlignment updateInterval cycle
>  * SourceCoordinator updates the smallest watermark from all subtasks for 
> each reporting
> In general, the key is subtaskIndex, so the number of key is parallelism. 
> When the parallelism is high, the watermark aggregation performance  will be 
> poor.
> h1. Performance Test:
> The parallelism is 1, each subtask reports 20 watermarks, and the 
> aggregate method takes 18.921s. Almost every round takes 950 ms.
>  * If the watermarkAlignment updateInterval is 1s, SourceCoordinator will be 
> very busy.
>  * If it's less than 1s, the Watermark aggregation will be delayed
> I have finished the POC for performance improvement, and reduced Watermark 
> aggregation time per watermarkAlignment updateInterval cycle from 950 ms to 6 
> ms.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32420) Watermark aggregation performance is poor when watermark alignment is enabled and parallelism is high

2023-07-13 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742846#comment-17742846
 ] 

Piotr Nowojski commented on FLINK-32420:


Yeah... no need for further comments :) Thanks [~fanrui]

 !Screenshot 2023-07-13 at 17.19.11.png|width=600!  !Screenshot 2023-07-13 at 
17.19.24.png|width=600! 

> Watermark aggregation performance is poor when watermark alignment is enabled 
> and parallelism is high
> -
>
> Key: FLINK-32420
> URL: https://issues.apache.org/jira/browse/FLINK-32420
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Affects Versions: 1.17.1
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
> Attachments: Screenshot 2023-07-13 at 17.19.11.png, Screenshot 
> 2023-07-13 at 17.19.24.png
>
>
> The 
> [SourceCoordinator.WatermarkAggregator#aggregate|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L644]
>  method will find the smallest watermark of all keys as the  
> aggregatedWatermark.
> However, the time complexity of the aggregate method in a WatermarkAlignment 
> updateInterval cycle is O(n*n),because:
>  * Every subtask report a latest watermark to SourceCoordinator in a 
> WatermarkAlignment updateInterval cycle
>  * SourceCoordinator updates the smallest watermark from all subtasks for 
> each reporting
> In general, the key is subtaskIndex, so the number of key is parallelism. 
> When the parallelism is high, the watermark aggregation performance  will be 
> poor.
> h1. Performance Test:
> The parallelism is 1, each subtask reports 20 watermarks, and the 
> aggregate method takes 18.921s. Almost every round takes 950 ms.
>  * If the watermarkAlignment updateInterval is 1s, SourceCoordinator will be 
> very busy.
>  * If it's less than 1s, the Watermark aggregation will be delayed
> I have finished the POC for performance improvement, and reduced Watermark 
> aggregation time per watermarkAlignment updateInterval cycle from 950 ms to 6 
> ms.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32420) Watermark aggregation performance is poor when watermark alignment is enabled and parallelism is high

2023-07-13 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-32420:
---
Attachment: Screenshot 2023-07-13 at 17.19.11.png
Screenshot 2023-07-13 at 17.19.24.png

> Watermark aggregation performance is poor when watermark alignment is enabled 
> and parallelism is high
> -
>
> Key: FLINK-32420
> URL: https://issues.apache.org/jira/browse/FLINK-32420
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Affects Versions: 1.17.1
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
> Attachments: Screenshot 2023-07-13 at 17.19.11.png, Screenshot 
> 2023-07-13 at 17.19.24.png
>
>
> The 
> [SourceCoordinator.WatermarkAggregator#aggregate|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L644]
>  method will find the smallest watermark of all keys as the  
> aggregatedWatermark.
> However, the time complexity of the aggregate method in a WatermarkAlignment 
> updateInterval cycle is O(n*n),because:
>  * Every subtask report a latest watermark to SourceCoordinator in a 
> WatermarkAlignment updateInterval cycle
>  * SourceCoordinator updates the smallest watermark from all subtasks for 
> each reporting
> In general, the key is subtaskIndex, so the number of key is parallelism. 
> When the parallelism is high, the watermark aggregation performance  will be 
> poor.
> h1. Performance Test:
> The parallelism is 1, each subtask reports 20 watermarks, and the 
> aggregate method takes 18.921s. Almost every round takes 950 ms.
>  * If the watermarkAlignment updateInterval is 1s, SourceCoordinator will be 
> very busy.
>  * If it's less than 1s, the Watermark aggregation will be delayed
> I have finished the POC for performance improvement, and reduced Watermark 
> aggregation time per watermarkAlignment updateInterval cycle from 950 ms to 6 
> ms.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32589) Carry over parallelism overrides to prevent users from clearing them on updates

2023-07-13 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742840#comment-17742840
 ] 

Gyula Fora commented on FLINK-32589:


I think this is not necessarily something that should be handled on the 
operator side otherwise the user has no way of actually removing the overrides.

If they use kubectly apply (serversideApply) without the override config 
defined, the configs would be naturally merged and carried over. By replacing 
the user can remove it by not setting the override.

> Carry over parallelism overrides to prevent users from clearing them on 
> updates
> ---
>
> Key: FLINK-32589
> URL: https://issues.apache.org/jira/browse/FLINK-32589
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Priority: Major
>
> The autoscaler currently sets the parallelism overrides via the Flink config 
> {{pipeline.jobvertex-parallelism-overrides}}. Whenever the user posts specs 
> updates, special care needs to be taken in order to carry over existing 
> overrides. Otherwise the job will reset to the default parallelism 
> configuration. Users shouldn't have to deal with this. Instead, whenever a 
> new spec is posted which does not contain the overrides, the operator should 
> automatically apply the last-used overrides (if autoscaling is enabled).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation

2023-07-13 Thread via GitHub


Samrat002 commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1262718932


##
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##
@@ -0,0 +1,1188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogView;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GluePartitionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.glue.util.GlueUtils;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.Table;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/** Glue catalog implementation that uses AWS Glue Data Catalog as persistence 
at backend. */
+@PublicEvolving
+public class GlueCa

[GitHub] [flink] wanglijie95 commented on a diff in pull request #22861: [FLINK-32387][runtime] InputGateDeploymentDescriptor uses cache to avoid deserializing shuffle descriptors multiple times

2023-07-13 Thread via GitHub


wanglijie95 commented on code in PR #22861:
URL: https://github.com/apache/flink/pull/22861#discussion_r1262696361


##
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java:
##
@@ -139,53 +139,67 @@ public IndexRange getConsumedSubpartitionIndexRange() {
 return consumedSubpartitionIndexRange;
 }
 
-public void loadBigData(@Nullable PermanentBlobService blobService, JobID 
jobId)
+public ShuffleDescriptor[] getShuffleDescriptors() {
+if (inputChannels == null) {
+throw new IllegalStateException("InputChannel should not be 
null.");
+}
+return inputChannels;
+}
+
+public void loadBigDataAndDeserializeShuffleDescriptors(
+@Nullable PermanentBlobService blobService,
+JobID jobId,
+ShuffleDescriptorsCache shuffleDescriptorsCache)
 throws IOException {
-for (int i = 0; i < serializedInputChannels.size(); i++) {
-MaybeOffloaded shuffleDescriptors =
-serializedInputChannels.get(i);
-if (shuffleDescriptors instanceof Offloaded) {
-PermanentBlobKey blobKey =
-((Offloaded) 
shuffleDescriptors)
-.serializedValueKey;
+try {
+if (inputChannels == null) {
+inputChannels = new ShuffleDescriptor[numberOfInputChannels];
+}
 
-Preconditions.checkNotNull(blobService);
+for (MaybeOffloaded 
serializedShuffleDescriptors :

Review Comment:
   Maybe there is some misunderstanding in our offline discussion, I think it 
is ok to introduce `ShuffleDescriptorGroup`(or `ShuffleDescriptorList`), which 
helps to understand the serialization process (The shuffle descriptors in the 
same`ShuffleDescriptorGroup` are serialized together)



##
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ShuffleDescriptorsCache.java:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
+
+/** Cache of shuffle descriptors in TaskExecutor. */
+public interface ShuffleDescriptorsCache {
+/**
+ * Start cache manager.
+ *
+ * @param mainThreadExecutor of main thread executor.
+ */
+void start(ComponentMainThreadExecutor mainThreadExecutor);
+
+/** Stop cache manager. */
+void stop();
+
+/**
+ * Get shuffle descriptors in cache.
+ *
+ * @param blobKey identify the shuffle descriptors
+ * @return shuffle descriptors in cache if exists, otherwise null
+ */
+TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[] 
get(PermanentBlobKey blobKey);
+
+/**
+ * Put shuffle descriptors to cache.
+ *
+ * @param jobId of job
+ * @param blobKey identify the shuffle descriptors
+ * @param shuffleDescriptorAndIndices shuffle descriptors to cache
+ */
+void put(
+JobID jobId,
+PermanentBlobKey blobKey,
+TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[]
+shuffleDescriptorAndIndices);
+
+/**
+ * Clear all cache of the Job.
+ *
+ * @param jobId of job
+ */
+void clearCacheOfJob(JobID jobId);

Review Comment:
   Maybe `clearCacheForJob`



##
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java:
##
@@ -68,6 +69,9 @@
 public class TaskManagerServices {
 private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerServices.class);
 
+private static final Duration SHUFFLE_DESCRIPTORS_CACHE_TIMEOUT = 
Duration.ofSeconds(300);
+private static final int SHUFFLE_DESCRIPTORS_CACHE_SIZE_LIMIT = 100;

Review Comment:
   Is this the num of `shuffle descriptors`? or  the num of 
`Blob/ShuffleDescriptorGroup`?



##
flink-runtime/src/main/java/org/apache/f

[jira] [Comment Edited] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect

2023-07-13 Thread jasonliangyc (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742834#comment-17742834
 ] 

jasonliangyc edited comment on FLINK-32579 at 7/13/23 3:09 PM:
---

Hi [~337361...@qq.com]. Yes, I'm using the 1.17.0, but i also tried the 1.17.1, 
same issues.

I can reproduce the problems by using the attached [^test_case.sql], could you 
try it again by following the setups in that file? thanks.

!image-2023-07-13-22-35-35-696.png|width=674,height=164!

 

For the second problem, it means:

if 
{code:java}
where p.name = '??' {code}
then the value of column 'name' return '??'

if
{code:java}
where p.name = '+' {code}
 then the value of column 'name' return '+'

but actually there are no such data in the 'products' table, it is weird that 
it always return the same value as the one in the .

 
{code:java}
select
cdc.order_id,
cdc.order_date,
cdc.customer_name,
cdc.price,
p.name
FROM orders AS cdc
left JOIN products 
FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id
where p.name = '??'{code}
 

!image-2023-07-13-22-38-16-709.png|width=663,height=120!

 

!image-2023-07-13-22-43-45-957.png|width=662,height=141!

 

 


was (Author: JIRAUSER301313):
Hi [~337361...@qq.com]. Yes, I'm using the 1.17.0, but i also tried the 1.17.1, 
same issues.

I can reproduce the problems by using the attached [^test_case.sql], could you 
try it again by following the setups in that file? thanks.

!image-2023-07-13-22-35-35-696.png|width=674,height=164!

 

For the second problem, it means:

if *where p.name = '??'* ,then the value of column 'name' return '??'

if {*}where p.name = '+'{*}, then the value of column 'name' return '+'

but actually there are no such data in the 'products' table, it is weird that 
it always return the same value as the one in the .

 
{code:java}
select
cdc.order_id,
cdc.order_date,
cdc.customer_name,
cdc.price,
p.name
FROM orders AS cdc
left JOIN products 
FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id
where p.name = '??'{code}
 

!image-2023-07-13-22-38-16-709.png|width=663,height=120!

 

!image-2023-07-13-22-43-45-957.png|width=662,height=141!

 

 

> The filter criteria on the lookup table of Lookup join has no effect 
> -
>
> Key: FLINK-32579
> URL: https://issues.apache.org/jira/browse/FLINK-32579
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0, 1.17.1
>Reporter: jasonliangyc
>Priority: Major
> Attachments: image-2023-07-12-09-31-18-261.png, 
> image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, 
> image-2023-07-13-17-19-26-972.png, image-2023-07-13-22-35-35-696.png, 
> image-2023-07-13-22-38-16-709.png, image-2023-07-13-22-43-24-213.png, 
> image-2023-07-13-22-43-45-957.png, test_case.sql
>
>
> *1.* I joined two tables using the lookup join as below query in sql-client, 
> the filter criteria of (p.name = '??') didn't shows up in the execution 
> detail and it returned the rows only base on one condiction (cdc.product_id = 
> p.id)
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and 
> cdc.product_id = p.id
> ; {code}
> !image-2023-07-12-09-31-18-261.png|width=657,height=132!
>  
> *2.* It showed the werid results when i changed the query as below, cause 
> there were no data in the table(products) that the value of column 'name' is 
> '??'  and and execution detail didn't show us the where criteria.
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id
> where p.name = '??'
> ; {code}
> !image-2023-07-12-09-42-59-231.png|width=684,height=102!
> !image-2023-07-12-09-47-31-397.png|width=685,height=120!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect

2023-07-13 Thread jasonliangyc (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742834#comment-17742834
 ] 

jasonliangyc edited comment on FLINK-32579 at 7/13/23 3:06 PM:
---

Hi [~337361...@qq.com]. Yes, I'm using the 1.17.0, but i also tried the 1.17.1, 
same issues.

I can reproduce the problems by using the attached [^test_case.sql], could you 
try it again by following the setups in that file? thanks.

!image-2023-07-13-22-35-35-696.png|width=674,height=164!

 

For the second problem, it means:

if *where p.name = '??'* ,then the value of column 'name' return '??'

if {*}where p.name = '+'{*}, then the value of column 'name' return '+'

but actually there are no such data in the 'products' table, it is weird that 
it always return the same value as the one in the .

 
{code:java}
select
cdc.order_id,
cdc.order_date,
cdc.customer_name,
cdc.price,
p.name
FROM orders AS cdc
left JOIN products 
FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id
where p.name = '??'{code}
 

!image-2023-07-13-22-38-16-709.png|width=663,height=120!

 

!image-2023-07-13-22-43-45-957.png|width=662,height=141!

 

 


was (Author: JIRAUSER301313):
Hi [~337361...@qq.com]. Yes, I'm using the 1.17.0, but i also tried the 1.17.1, 
same issues.

I can reproduce the problems by using the attached [^test_case.sql], could you 
try it again by following the setups in that file? thanks.

!image-2023-07-13-22-35-35-696.png|width=674,height=164!

 

For the second problem, it means:

if <{*}where p.name = '??'{*}>, then the value of column 'name' return 
'??'

if <{*}where p.name = '+'{*}>, then the value of column 'name' return 
'+'

but actually there are no such data in the 'products' table, it is weird that 
it always return the same value as the one in the .

 
{code:java}
select
cdc.order_id,
cdc.order_date,
cdc.customer_name,
cdc.price,
p.name
FROM orders AS cdc
left JOIN products 
FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id
where p.name = '??'{code}
 

!image-2023-07-13-22-38-16-709.png|width=663,height=120!

 

!image-2023-07-13-22-43-45-957.png|width=662,height=141!

 

 

> The filter criteria on the lookup table of Lookup join has no effect 
> -
>
> Key: FLINK-32579
> URL: https://issues.apache.org/jira/browse/FLINK-32579
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0, 1.17.1
>Reporter: jasonliangyc
>Priority: Major
> Attachments: image-2023-07-12-09-31-18-261.png, 
> image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, 
> image-2023-07-13-17-19-26-972.png, image-2023-07-13-22-35-35-696.png, 
> image-2023-07-13-22-38-16-709.png, image-2023-07-13-22-43-24-213.png, 
> image-2023-07-13-22-43-45-957.png, test_case.sql
>
>
> *1.* I joined two tables using the lookup join as below query in sql-client, 
> the filter criteria of (p.name = '??') didn't shows up in the execution 
> detail and it returned the rows only base on one condiction (cdc.product_id = 
> p.id)
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and 
> cdc.product_id = p.id
> ; {code}
> !image-2023-07-12-09-31-18-261.png|width=657,height=132!
>  
> *2.* It showed the werid results when i changed the query as below, cause 
> there were no data in the table(products) that the value of column 'name' is 
> '??'  and and execution detail didn't show us the where criteria.
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id
> where p.name = '??'
> ; {code}
> !image-2023-07-12-09-42-59-231.png|width=684,height=102!
> !image-2023-07-12-09-47-31-397.png|width=685,height=120!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect

2023-07-13 Thread jasonliangyc (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742834#comment-17742834
 ] 

jasonliangyc edited comment on FLINK-32579 at 7/13/23 3:05 PM:
---

Hi [~337361...@qq.com]. Yes, I'm using the 1.17.0, but i also tried the 1.17.1, 
same issues.

I can reproduce the problems by using the attached [^test_case.sql], could you 
try it again by following the setups in that file? thanks.

!image-2023-07-13-22-35-35-696.png|width=674,height=164!

 

For the second problem, it means:

if <{*}where p.name = '??'{*}>, then the value of column 'name' return 
'??'

if <{*}where p.name = '+'{*}>, then the value of column 'name' return 
'+'

but actually there are no such data in the 'products' table, it is weird that 
it always return the same value as the one in the .

 
{code:java}
select
cdc.order_id,
cdc.order_date,
cdc.customer_name,
cdc.price,
p.name
FROM orders AS cdc
left JOIN products 
FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id
where p.name = '??'{code}
 

!image-2023-07-13-22-38-16-709.png|width=663,height=120!

 

!image-2023-07-13-22-43-45-957.png|width=662,height=141!

 

 


was (Author: JIRAUSER301313):
Hi [~337361...@qq.com]. Yes, I'm using the 1.17.0, but i also tried the 1.17.1, 
same issues.

I can reproduce the problems by using the attached [^test_case.sql], could you 
try it again by following the setups in that file? thanks.

!image-2023-07-13-22-35-35-696.png|width=674,height=164!

 

For the second problem, it means:

if <{*}where p.name = '??'{*}>, then the value of column 'name' return 
'??'

if <{*}where p.name = '+'{*}>, then the value of column 'name' return 
'+'

but actually there are no such data in the 'products' table, it is weird that 
it always return the same value as the one in the .

 
{code:java}
select
cdc.order_id,
cdc.order_date,
cdc.customer_name,
cdc.price,
p.name
FROM orders AS cdc
left JOIN products 
FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id
where p.name = '??'{code}
 

!image-2023-07-13-22-38-16-709.png|width=663,height=120!

 

!image-2023-07-13-22-43-45-957.png|width=662,height=141!

 

 

> The filter criteria on the lookup table of Lookup join has no effect 
> -
>
> Key: FLINK-32579
> URL: https://issues.apache.org/jira/browse/FLINK-32579
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0, 1.17.1
>Reporter: jasonliangyc
>Priority: Major
> Attachments: image-2023-07-12-09-31-18-261.png, 
> image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, 
> image-2023-07-13-17-19-26-972.png, image-2023-07-13-22-35-35-696.png, 
> image-2023-07-13-22-38-16-709.png, image-2023-07-13-22-43-24-213.png, 
> image-2023-07-13-22-43-45-957.png, test_case.sql
>
>
> *1.* I joined two tables using the lookup join as below query in sql-client, 
> the filter criteria of (p.name = '??') didn't shows up in the execution 
> detail and it returned the rows only base on one condiction (cdc.product_id = 
> p.id)
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and 
> cdc.product_id = p.id
> ; {code}
> !image-2023-07-12-09-31-18-261.png|width=657,height=132!
>  
> *2.* It showed the werid results when i changed the query as below, cause 
> there were no data in the table(products) that the value of column 'name' is 
> '??'  and and execution detail didn't show us the where criteria.
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id
> where p.name = '??'
> ; {code}
> !image-2023-07-12-09-42-59-231.png|width=684,height=102!
> !image-2023-07-12-09-47-31-397.png|width=685,height=120!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation

2023-07-13 Thread via GitHub


Samrat002 commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1262700429


##
flink-catalog-aws/flink-catalog-aws-glue/pom.xml:
##
@@ -0,0 +1,125 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+4.0.0
+
+
+org.apache.flink
+flink-catalog-aws-parent
+4.2-SNAPSHOT
+
+
+flink-catalog-aws-glue
+Flink : Catalog : AWS : Glue
+
+
+jar
+
+
+
+
+org.apache.flink
+flink-table-api-java
+${flink.version}
+provided
+
+
+
+org.apache.flink
+flink-connector-aws-base
+${project.version}
+
+
+
+software.amazon.awssdk
+glue
+
+
+
+software.amazon.awssdk
+apache-client
+
+
+
+software.amazon.awssdk
+url-connection-client
+
+
+
+
+
+
+org.apache.flink
+flink-architecture-tests-test
+test
+
+
+
+org.apache.flink
+flink-table-common
+${flink.version}
+test-jar
+test
+
+
+
+org.apache.flink
+flink-table-api-java
+${flink.version}
+test-jar
+test
+
+
+
+org.projectlombok
+lombok
+1.18.22
+test
+
+
+
+
+
+
+
+org.apache.maven.plugins
+maven-jar-plugin
+
+
+
+test-jar
+
+
+
+META-INF/LICENSE
+META-INF/NOTICE
+
+
+
+
+
+
+
+

Review Comment:
   It was intended for e2e test for catalog. For now removing it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect

2023-07-13 Thread jasonliangyc (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742834#comment-17742834
 ] 

jasonliangyc commented on FLINK-32579:
--

Hi [~337361...@qq.com]. Yes, I'm using the 1.17.0, but i also tried the 1.17.1, 
same issues.

I can reproduce the problems by using the attached [^test_case.sql], could you 
try it again by following the setups in that file? thanks.

!image-2023-07-13-22-35-35-696.png|width=674,height=164!

 

For the second problem, it means:

if <{*}where p.name = '??'{*}>, then the value of column 'name' return 
'??'

if <{*}where p.name = '+'{*}>, then the value of column 'name' return 
'+'

but actually there are no such data in the 'products' table, it is weird that 
it always return the same value as the one in the .

 
{code:java}
select
cdc.order_id,
cdc.order_date,
cdc.customer_name,
cdc.price,
p.name
FROM orders AS cdc
left JOIN products 
FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id
where p.name = '??'{code}
 

!image-2023-07-13-22-38-16-709.png|width=663,height=120!

 

!image-2023-07-13-22-43-45-957.png|width=662,height=141!

 

 

> The filter criteria on the lookup table of Lookup join has no effect 
> -
>
> Key: FLINK-32579
> URL: https://issues.apache.org/jira/browse/FLINK-32579
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0, 1.17.1
>Reporter: jasonliangyc
>Priority: Major
> Attachments: image-2023-07-12-09-31-18-261.png, 
> image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, 
> image-2023-07-13-17-19-26-972.png, image-2023-07-13-22-35-35-696.png, 
> image-2023-07-13-22-38-16-709.png, image-2023-07-13-22-43-24-213.png, 
> image-2023-07-13-22-43-45-957.png, test_case.sql
>
>
> *1.* I joined two tables using the lookup join as below query in sql-client, 
> the filter criteria of (p.name = '??') didn't shows up in the execution 
> detail and it returned the rows only base on one condiction (cdc.product_id = 
> p.id)
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and 
> cdc.product_id = p.id
> ; {code}
> !image-2023-07-12-09-31-18-261.png|width=657,height=132!
>  
> *2.* It showed the werid results when i changed the query as below, cause 
> there were no data in the table(products) that the value of column 'name' is 
> '??'  and and execution detail didn't show us the where criteria.
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id
> where p.name = '??'
> ; {code}
> !image-2023-07-12-09-42-59-231.png|width=684,height=102!
> !image-2023-07-12-09-47-31-397.png|width=685,height=120!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] akalash commented on a diff in pull request #22761: [FLINK-32298][network] Fix the bug that totalWrittenBytes of BufferWritingResultPartition misses some data

2023-07-13 Thread via GitHub


akalash commented on code in PR #22761:
URL: https://github.com/apache/flink/pull/22761#discussion_r1262673283


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java:
##
@@ -65,7 +65,7 @@ public abstract class BufferWritingResultPartition extends 
ResultPartition {
 
 private TimerGauge hardBackPressuredTimeMsPerSecond = new TimerGauge();
 
-private long totalWrittenBytes;
+protected long totalWrittenBytes;

Review Comment:
   Ideally, `totalWrittenBytes` should be changed only in one class 
`BufferWritingResultPartition`. So maybe we should forbid `inherited classes` 
to write to `subpartitions` directly. But honestly, I don't know how to do it 
easily since the current implementation of `inherited classes` use 
`subpartitions` a lot and we can not change `subpartitions` visibility from 
`protected` to `private` . I mean we can create `addToSubpartition` method in 
`BufferWritingResultPartition` and use it everywhere(and maybe it will be 
better than now) but it still doesn't protect us from possible future bugs if 
somebody decides to use `subpartitions#add` directly in `inherited classes`.
   
   Anyway, if you don't think that it will make things worse, maybe we indeed 
for now create the `add`(or `addToSubpartition`) method in 
`BufferWritingResultPartition` and will use it everywhere. At least it will 
reduce the number of places where we update `totalWrittenBytes`. 
Unfortunatelly, I didn't come up with any other solution until now.



##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java:
##
@@ -190,51 +195,60 @@ public void recover(
 int oldSubtaskIndex,
 BufferWithContext bufferWithContext)
 throws IOException {
-try (BufferBuilder bufferBuilder = bufferWithContext.context) {
-try (BufferConsumer bufferConsumer =
-bufferBuilder.createBufferConsumerFromBeginning()) {
-bufferBuilder.finish();
-if (bufferConsumer.isDataAvailable()) {
-final List channels =
-getMappedChannels(subpartitionInfo);
-for (final CheckpointedResultSubpartition channel : 
channels) {
-// channel selector is created from the downstream's 
point of view: the
-// subtask of downstream = subpartition index of 
recovered buffer
-final SubtaskConnectionDescriptor channelSelector =
-new SubtaskConnectionDescriptor(
-subpartitionInfo.getSubPartitionIdx(), 
oldSubtaskIndex);
-channel.addRecovered(
-
EventSerializer.toBufferConsumer(channelSelector, false));
-channel.addRecovered(bufferConsumer.copy());
-}
-}
+try (BufferBuilder bufferBuilder = bufferWithContext.context;
+BufferConsumer bufferConsumer = 
bufferBuilder.createBufferConsumerFromBeginning()) {
+bufferBuilder.finish();
+if (!bufferConsumer.isDataAvailable()) {
+return;
+}
+final List mappedSubpartitions =
+getMappedSubpartitions(subpartitionInfo);
+for (final ResultSubpartitionInfo mappedSubpartition : 
mappedSubpartitions) {
+// channel selector is created from the downstream's point of 
view: the
+// subtask of downstream = subpartition index of recovered 
buffer
+final SubtaskConnectionDescriptor channelSelector =
+new SubtaskConnectionDescriptor(
+subpartitionInfo.getSubPartitionIdx(), 
oldSubtaskIndex);
+CheckpointedResultPartition checkpointedResultPartition =
+
getCheckpointedResultPartition(mappedSubpartition.getPartitionIdx());
+checkpointedResultPartition.addRecovered(

Review Comment:
   good. It's better now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect

2023-07-13 Thread jasonliangyc (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jasonliangyc updated FLINK-32579:
-
Attachment: test_case.sql

> The filter criteria on the lookup table of Lookup join has no effect 
> -
>
> Key: FLINK-32579
> URL: https://issues.apache.org/jira/browse/FLINK-32579
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0, 1.17.1
>Reporter: jasonliangyc
>Priority: Major
> Attachments: image-2023-07-12-09-31-18-261.png, 
> image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, 
> image-2023-07-13-17-19-26-972.png, image-2023-07-13-22-35-35-696.png, 
> image-2023-07-13-22-38-16-709.png, image-2023-07-13-22-43-24-213.png, 
> image-2023-07-13-22-43-45-957.png, test_case.sql
>
>
> *1.* I joined two tables using the lookup join as below query in sql-client, 
> the filter criteria of (p.name = '??') didn't shows up in the execution 
> detail and it returned the rows only base on one condiction (cdc.product_id = 
> p.id)
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and 
> cdc.product_id = p.id
> ; {code}
> !image-2023-07-12-09-31-18-261.png|width=657,height=132!
>  
> *2.* It showed the werid results when i changed the query as below, cause 
> there were no data in the table(products) that the value of column 'name' is 
> '??'  and and execution detail didn't show us the where criteria.
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id
> where p.name = '??'
> ; {code}
> !image-2023-07-12-09-42-59-231.png|width=684,height=102!
> !image-2023-07-12-09-47-31-397.png|width=685,height=120!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect

2023-07-13 Thread jasonliangyc (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jasonliangyc updated FLINK-32579:
-
Attachment: image-2023-07-13-22-43-45-957.png

> The filter criteria on the lookup table of Lookup join has no effect 
> -
>
> Key: FLINK-32579
> URL: https://issues.apache.org/jira/browse/FLINK-32579
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0, 1.17.1
>Reporter: jasonliangyc
>Priority: Major
> Attachments: image-2023-07-12-09-31-18-261.png, 
> image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, 
> image-2023-07-13-17-19-26-972.png, image-2023-07-13-22-35-35-696.png, 
> image-2023-07-13-22-38-16-709.png, image-2023-07-13-22-43-24-213.png, 
> image-2023-07-13-22-43-45-957.png
>
>
> *1.* I joined two tables using the lookup join as below query in sql-client, 
> the filter criteria of (p.name = '??') didn't shows up in the execution 
> detail and it returned the rows only base on one condiction (cdc.product_id = 
> p.id)
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and 
> cdc.product_id = p.id
> ; {code}
> !image-2023-07-12-09-31-18-261.png|width=657,height=132!
>  
> *2.* It showed the werid results when i changed the query as below, cause 
> there were no data in the table(products) that the value of column 'name' is 
> '??'  and and execution detail didn't show us the where criteria.
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id
> where p.name = '??'
> ; {code}
> !image-2023-07-12-09-42-59-231.png|width=684,height=102!
> !image-2023-07-12-09-47-31-397.png|width=685,height=120!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect

2023-07-13 Thread jasonliangyc (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jasonliangyc updated FLINK-32579:
-
Attachment: image-2023-07-13-22-43-24-213.png

> The filter criteria on the lookup table of Lookup join has no effect 
> -
>
> Key: FLINK-32579
> URL: https://issues.apache.org/jira/browse/FLINK-32579
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0, 1.17.1
>Reporter: jasonliangyc
>Priority: Major
> Attachments: image-2023-07-12-09-31-18-261.png, 
> image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, 
> image-2023-07-13-17-19-26-972.png, image-2023-07-13-22-35-35-696.png, 
> image-2023-07-13-22-38-16-709.png, image-2023-07-13-22-43-24-213.png, 
> image-2023-07-13-22-43-45-957.png
>
>
> *1.* I joined two tables using the lookup join as below query in sql-client, 
> the filter criteria of (p.name = '??') didn't shows up in the execution 
> detail and it returned the rows only base on one condiction (cdc.product_id = 
> p.id)
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and 
> cdc.product_id = p.id
> ; {code}
> !image-2023-07-12-09-31-18-261.png|width=657,height=132!
>  
> *2.* It showed the werid results when i changed the query as below, cause 
> there were no data in the table(products) that the value of column 'name' is 
> '??'  and and execution detail didn't show us the where criteria.
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id
> where p.name = '??'
> ; {code}
> !image-2023-07-12-09-42-59-231.png|width=684,height=102!
> !image-2023-07-12-09-47-31-397.png|width=685,height=120!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation

2023-07-13 Thread via GitHub


Samrat002 commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1262663157


##
flink-catalog-aws/flink-catalog-aws-glue/pom.xml:
##
@@ -0,0 +1,125 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+4.0.0
+
+
+org.apache.flink
+flink-catalog-aws-parent
+4.2-SNAPSHOT
+
+
+flink-catalog-aws-glue
+Flink : Catalog : AWS : Glue
+
+
+jar
+
+
+
+
+org.apache.flink
+flink-table-api-java
+${flink.version}
+provided
+
+
+
+org.apache.flink
+flink-connector-aws-base
+${project.version}
+
+
+
+software.amazon.awssdk
+glue
+
+
+
+software.amazon.awssdk
+apache-client
+
+
+
+software.amazon.awssdk
+url-connection-client
+
+

Review Comment:
   removed url client . it was redundant 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect

2023-07-13 Thread jasonliangyc (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jasonliangyc updated FLINK-32579:
-
Attachment: image-2023-07-13-22-38-16-709.png

> The filter criteria on the lookup table of Lookup join has no effect 
> -
>
> Key: FLINK-32579
> URL: https://issues.apache.org/jira/browse/FLINK-32579
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0, 1.17.1
>Reporter: jasonliangyc
>Priority: Major
> Attachments: image-2023-07-12-09-31-18-261.png, 
> image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, 
> image-2023-07-13-17-19-26-972.png, image-2023-07-13-22-35-35-696.png, 
> image-2023-07-13-22-38-16-709.png
>
>
> *1.* I joined two tables using the lookup join as below query in sql-client, 
> the filter criteria of (p.name = '??') didn't shows up in the execution 
> detail and it returned the rows only base on one condiction (cdc.product_id = 
> p.id)
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and 
> cdc.product_id = p.id
> ; {code}
> !image-2023-07-12-09-31-18-261.png|width=657,height=132!
>  
> *2.* It showed the werid results when i changed the query as below, cause 
> there were no data in the table(products) that the value of column 'name' is 
> '??'  and and execution detail didn't show us the where criteria.
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id
> where p.name = '??'
> ; {code}
> !image-2023-07-12-09-42-59-231.png|width=684,height=102!
> !image-2023-07-12-09-47-31-397.png|width=685,height=120!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect

2023-07-13 Thread jasonliangyc (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jasonliangyc updated FLINK-32579:
-
Attachment: image-2023-07-13-22-35-35-696.png

> The filter criteria on the lookup table of Lookup join has no effect 
> -
>
> Key: FLINK-32579
> URL: https://issues.apache.org/jira/browse/FLINK-32579
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0, 1.17.1
>Reporter: jasonliangyc
>Priority: Major
> Attachments: image-2023-07-12-09-31-18-261.png, 
> image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, 
> image-2023-07-13-17-19-26-972.png, image-2023-07-13-22-35-35-696.png
>
>
> *1.* I joined two tables using the lookup join as below query in sql-client, 
> the filter criteria of (p.name = '??') didn't shows up in the execution 
> detail and it returned the rows only base on one condiction (cdc.product_id = 
> p.id)
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and 
> cdc.product_id = p.id
> ; {code}
> !image-2023-07-12-09-31-18-261.png|width=657,height=132!
>  
> *2.* It showed the werid results when i changed the query as below, cause 
> there were no data in the table(products) that the value of column 'name' is 
> '??'  and and execution detail didn't show us the where criteria.
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id
> where p.name = '??'
> ; {code}
> !image-2023-07-12-09-42-59-231.png|width=684,height=102!
> !image-2023-07-12-09-47-31-397.png|width=685,height=120!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation

2023-07-13 Thread via GitHub


Samrat002 commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1262655614


##
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/operator/GlueFunctionOperator.java:
##
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueUtils;
+import org.apache.flink.table.resource.ResourceUri;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import 
software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionResponse;
+import 
software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionResponse;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.PrincipalType;
+import 
software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/** Utilities for Glue catalog Function related operations. */
+@Internal
+public class GlueFunctionOperator extends GlueOperator {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(GlueFunctionOperator.class);
+
+public GlueFunctionOperator(String catalogName, GlueClient glueClient, 
String glueCatalogId) {
+super(catalogName, glueClient, glueCatalogId);
+}
+
+/**
+ * Create a function. Function name should be handled in a 
case-insensitive way.
+ *
+ * @param functionPath path of the function
+ * @param function Flink function to be created
+ * @throws CatalogException in case of any runtime exception
+ */
+public void createGlueFunction(ObjectPath functionPath, CatalogFunction 
function)
+throws CatalogException, FunctionAlreadyExistException {
+
+UserDefinedFunctionInput functionInput = 
createFunctionInput(functionPath, function);
+CreateUserDefinedFunctionRequest.Builder createUDFRequest =
+CreateUserDefinedFunctionRequest.builder()
+.databaseName(functionPath.getDatabaseName())
+.catalogId(getGlueCatalogId())
+.functionInput(functionInput);
+try {
+CreateUserDefinedFunctionResponse response =
+
glueClient.createUserDefinedFunction(createUDFRequest.build());
+GlueUtils.validateGlueResponse(response);
+LOG.info(String.format("Function created. %s", 
functionPath.getFullName()));

Review Comment:
   ack, modified code to use native logger in the PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, p

[jira] [Created] (FLINK-32589) Carry over parallelism overrides to prevent users from clearing them on updates

2023-07-13 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-32589:
--

 Summary: Carry over parallelism overrides to prevent users from 
clearing them on updates
 Key: FLINK-32589
 URL: https://issues.apache.org/jira/browse/FLINK-32589
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler, Kubernetes Operator
Reporter: Maximilian Michels


The autoscaler currently sets the parallelism overrides via the Flink config 
{{pipeline.jobvertex-parallelism-overrides}}. Whenever the user posts specs 
updates, special care needs to be taken in order to carry over existing 
overrides. Otherwise the job will reset to the default parallelism 
configuration. Users shouldn't have to deal with this. Instead, whenever a new 
spec is posted which does not contain the overrides, the operator should 
automatically apply the last-used overrides (if autoscaling is enabled).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] XComp commented on pull request #22988: [FLINK-32157] Replaces LeaderConnectionInfo with LeaderInformation

2023-07-13 Thread via GitHub


XComp commented on PR #22988:
URL: https://github.com/apache/flink/pull/22988#issuecomment-1634351279

   (sorry for not approving the PR before merging :innocent: )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-32157) Replaces LeaderConnectionInfo with LeaderInformation

2023-07-13 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl resolved FLINK-32157.
---
Fix Version/s: 1.18.0
   Resolution: Fixed

master: f37d41cf557e9acd113a063dbee442a3a92bf09e

> Replaces LeaderConnectionInfo with LeaderInformation
> 
>
> Key: FLINK-32157
> URL: https://issues.apache.org/jira/browse/FLINK-32157
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Matthias Pohl
>Assignee: Jiadong Lu
>Priority: Major
>  Labels: pull-request-available, starter
> Fix For: 1.18.0
>
>
> {{LeaderConnectionInfo}} and {{LeaderInformation}} have the same purpose. 
> {{LeaderInformation}} could substitute any occurrences of 
> {{LeaderConnectionInfo}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] XComp merged pull request #22988: [FLINK-32157] Replaces LeaderConnectionInfo with LeaderInformation

2023-07-13 Thread via GitHub


XComp merged PR #22988:
URL: https://github.com/apache/flink/pull/22988


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on pull request #22010: [FLINK-31192][connectors/dataGen] Fix dataGen takes too long to initi…

2023-07-13 Thread via GitHub


XComp commented on PR #22010:
URL: https://github.com/apache/flink/pull/22010#issuecomment-1634336399

   > In https://github.com/apache/flink/pull/21971 we added 
SequenceGeneratorTest, I think we can add the corresponding test method in 
https://github.com/apache/flink/pull/21971
   
   I'm not a big fan of adding tests that belong in a PR into a separate PR. 
You could just create the test class in this PR with the set of tests that fit 
to this PR. PR #21971 should then add additional tests related to its changes. 
The conflict resolution shouldn't be too complex because your adding 
independent test methods. :thinking: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #22939: [FLINK-32474][table] Support time travel in table planner

2023-07-13 Thread via GitHub


luoyuxia commented on code in PR #22939:
URL: https://github.com/apache/flink/pull/22939#discussion_r1262460439


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##
@@ -380,7 +380,28 @@ public Optional 
getTable(ObjectIdentifier objectIdentifier
 resolveCatalogBaseTable(temporaryTable);
 return 
Optional.of(ContextResolvedTable.temporary(objectIdentifier, resolvedTable));
 } else {
-return getPermanentTable(objectIdentifier);
+return getPermanentTable(objectIdentifier, Optional.empty());
+}
+}
+
+/**
+ * Retrieves a fully qualified table with a specific time. If the path is 
not yet fully

Review Comment:
   nit:
   If the path is not yet fully qualified, use {@link 
#qualifyIdentifier(UnresolvedIdentifier)} first.



##
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SnapshotScope.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.sql.validate;
+
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.sql.SqlNode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.List;
+
+/** Represent the snapshot of the {@link SqlValidatorScope} */
+public class SnapshotScope extends DelegatingScope {
+private final SqlValidatorSnapshot sqlValidatorSnapshot;
+
+public SnapshotScope(SqlValidatorScope parent, SchemaVersion 
schemaVersion) {
+super(parent);
+this.sqlValidatorSnapshot =
+new SqlValidatorSnapshot((SqlValidatorImpl) 
parent.getValidator(), schemaVersion);
+}
+
+@Override
+public void resolveTable(
+List names, SqlNameMatcher nameMatcher, Path path, 
Resolved resolved) {
+// In the time travel case, the parent of the  ScopeSnapshot will 
always be CatalogScope

Review Comment:
   ```suggestion
   // In the time travel case, the parent of the ScopeSnapshot will 
always be CatalogScope
   ```



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##
@@ -446,12 +467,24 @@ public Optional getPartition(
 return Optional.empty();
 }
 
-private Optional getPermanentTable(ObjectIdentifier 
objectIdentifier) {
+private Optional getPermanentTable(
+ObjectIdentifier objectIdentifier, Optional timestamp) {

Review Comment:
   Got warning from my IDE
   `'Optional' used as type for parameter 'timestamp' `.
   How about using `@Nullable Long timestamp`?



##
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/IdentifierSnapshotNamespace.java:
##
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.sql.validate;
+
+import org.apache.calcite.schema.SchemaVersion;
+
+/** Represent the snapshot of the {@link IdentifierNamespace} */

Review Comment:
   Sorry for revisiting it again. But I found I'm still confused about the 
comments.
   Maybe we can follow the comment `Common base class for DML statement 
namespaces` for `DmlNamespace`.
   How about changing the comment to `A namespace for the statement with {@code 
FOR SYSTEM_TIME AS OF TIMESTAMP} clause.`
   ?



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/FlinkCalcite

[GitHub] [flink] liuyongvs commented on pull request #22745: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

2023-07-13 Thread via GitHub


liuyongvs commented on PR #22745:
URL: https://github.com/apache/flink/pull/22745#issuecomment-1634235602

   Hi @snuyanzin do you have time? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on pull request #22984: [FLINK-32570][runtime][stream] Deprecates @Public/@PublicEvolving API that uses Flink's Time classes

2023-07-13 Thread via GitHub


XComp commented on PR #22984:
URL: https://github.com/apache/flink/pull/22984#issuecomment-1634211308

   Yup, I agree. I feel like it makes sense to summarize everything in a FLIP. 
I created 
[FLIP-335](https://cwiki.apache.org/confluence/display/FLINK/FLIP-335%3A+removing+flink's+time+classes)
 and started a [discussion 
thread](https://lists.apache.org/thread/48ysrg1rrtl8s1twg9wmx35l201hnc2w).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on pull request #22984: [FLINK-32570][runtime][stream] Deprecates @Public/@PublicEvolving API that uses Flink's Time classes

2023-07-13 Thread via GitHub


zentol commented on PR #22984:
URL: https://github.com/apache/flink/pull/22984#issuecomment-1634202032

   > I assumed the [discussion mentioned in 
FLINK-14068](https://lists.apache.org/thread/76yywnwf3lk8qn4dby0vz7yoqx7f7pkj) 
and [FLINK-14068](https://issues.apache.org/jira/browse/FLINK-14068) itself are 
good enough as a conclusion.
   
   Which is a fair point, but I' just not sure if a discussion thread from 4 
years ago where 2.0 wasn't something tangible meets our current bar for making 
breaking changes :thinking: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fredia commented on pull request #22890: [FLINK-32072][checkpoint] Create and wire FileMergingSnapshotManager with TaskManagerServices

2023-07-13 Thread via GitHub


fredia commented on PR #22890:
URL: https://github.com/apache/flink/pull/22890#issuecomment-1634170838

   @AlexYinHan Thanks for the review and suggestion,  I have addressed your 
comments. It could be very nice if you could take another look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fredia commented on a diff in pull request #22890: [FLINK-32072][checkpoint] Create and wire FileMergingSnapshotManager with TaskManagerServices

2023-07-13 Thread via GitHub


fredia commented on code in PR #22890:
URL: https://github.com/apache/flink/pull/22890#discussion_r1262496038


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java:
##
@@ -75,4 +77,14 @@ CheckpointStreamFactory resolveCheckpointStorageLocation(
  * @return A toolset for additional operations for state owned by tasks.
  */
 CheckpointStateToolset createTaskOwnedCheckpointStateToolset();
+
+/**
+ * Return {@link 
org.apache.flink.runtime.state.filesystem.FsMergingCheckpointStorageAccess} if
+ * file merging is enabled. Otherwise, return itself.
+ */
+default CheckpointStorageWorkerView toFileMergingStorage(

Review Comment:
   Thanks for your suggestion, putting `toFileMergingStorage` into 
`CheckpointStorageWorkerView` is to reduce the [type checking in 
StreamTask](https://github.com/apache/flink/pull/22890#discussion_r1251648094).
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fredia commented on a diff in pull request #22890: [FLINK-32072][checkpoint] Create and wire FileMergingSnapshotManager with TaskManagerServices

2023-07-13 Thread via GitHub


fredia commented on code in PR #22890:
URL: https://github.com/apache/flink/pull/22890#discussion_r1262494966


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** An implementation of file merging checkpoint storage to file systems. */
+public class FsMergingCheckpointStorageAccess extends 
FsCheckpointStorageAccess {
+
+/** FileMergingSnapshotManager manage files and meta information for 
checkpoint. */
+private final FileMergingSnapshotManager fileMergingSnapshotManager;
+
+/** The identity of subtask. */
+private final FileMergingSnapshotManager.SubtaskKey subtaskKey;
+
+public FsMergingCheckpointStorageAccess(
+FileSystem fs,
+Path checkpointBaseDirectory,
+Path sharedStateDirectory,
+Path taskOwnedStateDirectory,
+@Nullable Path defaultSavepointDirectory,
+JobID jobId,
+int fileSizeThreshold,
+int writeBufferSize,
+FileMergingSnapshotManager fileMergingSnapshotManager,
+Environment environment)
+throws IOException {
+super(
+fs,
+checkpointBaseDirectory,
+defaultSavepointDirectory,
+jobId,
+fileSizeThreshold,
+writeBufferSize);
+this.fileMergingSnapshotManager = fileMergingSnapshotManager;
+this.subtaskKey =
+new SubtaskKey(
+
OperatorID.fromJobVertexID(environment.getJobVertexId()),
+environment.getTaskInfo());
+initSegmentSnapshotManager(

Review Comment:
   Right, `SegmentSnapshotManager` should be `FileMergingSnapshotManager`.
   👍I put these two lines of code at the end of the constructor as your 
suggestion.



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** An implementation of file merging checkpoint storage to file systems. */
+public class FsMergingCheckpointStorageAccess extends 
FsCheckpointStorageAccess {
+
+/** FileMergingSnapshotManager manage files and meta i

[GitHub] [flink] lsyldliu commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.

2023-07-13 Thread via GitHub


lsyldliu commented on code in PR #22966:
URL: https://github.com/apache/flink/pull/22966#discussion_r1262402227


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/runtimefilter/BatchPhysicalGlobalRuntimeFilterBuilder.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter;
+
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import 
org.apache.flink.table.planner.plan.nodes.exec.batch.runtimefilter.BatchExecGlobalRuntimeFilterBuilder;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRel;
+import 
org.apache.flink.table.planner.plan.optimize.program.FlinkRuntimeFilterProgram;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.type.RelDataType;
+
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig;
+
+/**
+ * Batch physical RelNode responsible for aggregating all received filters 
into a global filter. See
+ * {@link FlinkRuntimeFilterProgram} for more info.
+ */
+public class BatchPhysicalGlobalRuntimeFilterBuilder extends SingleRel 
implements BatchPhysicalRel {

Review Comment:
   Please override the `explainTerms` method, it gives a meaningful description 
for runtime debugging. We can print the corresponding fields of buildIndices, 
maxRowCount.



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecGlobalRuntimeFilterBuilder.java:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch.runtimefilter;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.List;
+
+/** Batch {@link ExecNode} for global runtime filter builder. */
+public class BatchExecGlobalRuntimeFilterBuilder extends ExecNodeBase
+implements BatchExecNode {
+
+private final int maxRowCount;
+
+public BatchExecGlobalRuntimeFilterBuilder(
+ReadableConfig tableConfig,
+List inputProperties,
+LogicalType outputType,
+String description,
+int maxRowCount) {
+super(
+ExecNodeContext.newNodeId(),
+
ExecNodeContext.newContext(BatchExecLocalRuntimeFilterBuilder.class),
+ExecNodeContext.newPersiste

[GitHub] [flink-web] Limookiplimo opened a new pull request, #663: Refactor grammar on flink-architecture markdown

2023-07-13 Thread via GitHub


Limookiplimo opened a new pull request, #663:
URL: https://github.com/apache/flink-web/pull/663

   Updated grammar on the markdown to improve clarity and accuracy. Added the 
preposition "with" after "work well" and changed "each of" to "with each of" 
for consistency.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-32049) CoordinatedSourceRescaleITCase.testDownscaling fails on AZP

2023-07-13 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan resolved FLINK-32049.
-
Fix Version/s: 1.18.0
   1.17.2
 Assignee: Rui Fan  (was: Qingsheng Ren)
   Resolution: Fixed

> CoordinatedSourceRescaleITCase.testDownscaling fails on AZP
> ---
>
> Key: FLINK-32049
> URL: https://issues.apache.org/jira/browse/FLINK-32049
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Sergey Nuyanzin
>Assignee: Rui Fan
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0, 1.17.2
>
> Attachments: logs-cron_azure-test_cron_azure_connect_2-1686196685.zip
>
>
> CoordinatedSourceRescaleITCase.testDownscaling fails with
> {noformat}
> May 08 03:19:14 [ERROR] Failures: 
> May 08 03:19:14 [ERROR]   
> CoordinatedSourceRescaleITCase.testDownscaling:75->resumeCheckpoint:107 
> May 08 03:19:14 Multiple Failures (1 failure)
> May 08 03:19:14 -- failure 1 --
> May 08 03:19:14 [Any cause contains message 'successfully restored 
> checkpoint'] 
> May 08 03:19:14 Expecting any element of:
> May 08 03:19:14   [org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> May 08 03:19:14   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> May 08 03:19:14   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> May 08 03:19:14   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> May 08 03:19:14   ...(45 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed),
> May 08 03:19:14 org.apache.flink.runtime.JobException: Recovery is 
> suppressed by NoRestartBackoffTimeStrategy
> May 08 03:19:14   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
> May 08 03:19:14   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
> May 08 03:19:14   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
> May 08 03:19:14   ...(35 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed),
> May 08 03:19:14 java.lang.IllegalStateException: This executor has been 
> registered.
> May 08 03:19:14   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> May 08 03:19:14   at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.registerSubtask(ChannelStateWriteRequestExecutorImpl.java:341)
> May 08 03:19:14   at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory.getOrCreateExecutor(ChannelStateWriteRequestExecutorFactory.java:63)
> May 08 03:19:14   ...(17 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed)]
> May 08 03:19:14 to satisfy the given assertions requirements but none did:
> May 08 03:19:14 
> May 08 03:19:14 org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> May 08 03:19:14   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> May 08 03:19:14   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> May 08 03:19:14   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> May 08 03:19:14   ...(45 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed)
> May 08 03:19:14 error: 
> May 08 03:19:14 Expecting throwable message:
> May 08 03:19:14   "Job execution failed."
> May 08 03:19:14 to contain:
> May 08 03:19:14   "successfully restored checkpoint"
> May 08 03:19:14 but did not.
> May 08 03:19:14 
> {noformat}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48772&view=logs&j=fc7981dc-d266-55b0-5fff-f0d0a2294e36&t=1a9b228a-3e0e-598f-fc81-c321539dfdbf&l=7191



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32049) CoordinatedSourceRescaleITCase.testDownscaling fails on AZP

2023-07-13 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742758#comment-17742758
 ] 

Rui Fan commented on FLINK-32049:
-

Hi [~Sergey Nuyanzin][~renqs], thanks for the reporting.

We have fixed the bug. In theory, this exception cannot happen again. I close 
this JIRA first, please cc me if it happens again.

> CoordinatedSourceRescaleITCase.testDownscaling fails on AZP
> ---
>
> Key: FLINK-32049
> URL: https://issues.apache.org/jira/browse/FLINK-32049
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Sergey Nuyanzin
>Assignee: Qingsheng Ren
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Attachments: logs-cron_azure-test_cron_azure_connect_2-1686196685.zip
>
>
> CoordinatedSourceRescaleITCase.testDownscaling fails with
> {noformat}
> May 08 03:19:14 [ERROR] Failures: 
> May 08 03:19:14 [ERROR]   
> CoordinatedSourceRescaleITCase.testDownscaling:75->resumeCheckpoint:107 
> May 08 03:19:14 Multiple Failures (1 failure)
> May 08 03:19:14 -- failure 1 --
> May 08 03:19:14 [Any cause contains message 'successfully restored 
> checkpoint'] 
> May 08 03:19:14 Expecting any element of:
> May 08 03:19:14   [org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> May 08 03:19:14   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> May 08 03:19:14   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> May 08 03:19:14   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> May 08 03:19:14   ...(45 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed),
> May 08 03:19:14 org.apache.flink.runtime.JobException: Recovery is 
> suppressed by NoRestartBackoffTimeStrategy
> May 08 03:19:14   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
> May 08 03:19:14   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
> May 08 03:19:14   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
> May 08 03:19:14   ...(35 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed),
> May 08 03:19:14 java.lang.IllegalStateException: This executor has been 
> registered.
> May 08 03:19:14   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> May 08 03:19:14   at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.registerSubtask(ChannelStateWriteRequestExecutorImpl.java:341)
> May 08 03:19:14   at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory.getOrCreateExecutor(ChannelStateWriteRequestExecutorFactory.java:63)
> May 08 03:19:14   ...(17 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed)]
> May 08 03:19:14 to satisfy the given assertions requirements but none did:
> May 08 03:19:14 
> May 08 03:19:14 org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> May 08 03:19:14   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> May 08 03:19:14   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> May 08 03:19:14   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> May 08 03:19:14   ...(45 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed)
> May 08 03:19:14 error: 
> May 08 03:19:14 Expecting throwable message:
> May 08 03:19:14   "Job execution failed."
> May 08 03:19:14 to contain:
> May 08 03:19:14   "successfully restored checkpoint"
> May 08 03:19:14 but did not.
> May 08 03:19:14 
> {noformat}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48772&view=logs&j=fc7981dc-d266-55b0-5fff-f0d0a2294e36&t=1a9b228a-3e0e-598f-fc81-c321539dfdbf&l=7191



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32049) CoordinatedSourceRescaleITCase.testDownscaling fails on AZP

2023-07-13 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742689#comment-17742689
 ] 

Rui Fan edited comment on FLINK-32049 at 7/13/23 10:27 AM:
---

Merged via:
 bc4c21e47040360aab5bcb0f2c18b907b60e7838
1.17 : c8b6c79ee57050cea8be81f56b707ccbcc0fdf4d


was (Author: fanrui):
Merged via:
 bc4c21e47040360aab5bcb0f2c18b907b60e7838

> CoordinatedSourceRescaleITCase.testDownscaling fails on AZP
> ---
>
> Key: FLINK-32049
> URL: https://issues.apache.org/jira/browse/FLINK-32049
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Sergey Nuyanzin
>Assignee: Qingsheng Ren
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Attachments: logs-cron_azure-test_cron_azure_connect_2-1686196685.zip
>
>
> CoordinatedSourceRescaleITCase.testDownscaling fails with
> {noformat}
> May 08 03:19:14 [ERROR] Failures: 
> May 08 03:19:14 [ERROR]   
> CoordinatedSourceRescaleITCase.testDownscaling:75->resumeCheckpoint:107 
> May 08 03:19:14 Multiple Failures (1 failure)
> May 08 03:19:14 -- failure 1 --
> May 08 03:19:14 [Any cause contains message 'successfully restored 
> checkpoint'] 
> May 08 03:19:14 Expecting any element of:
> May 08 03:19:14   [org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> May 08 03:19:14   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> May 08 03:19:14   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> May 08 03:19:14   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> May 08 03:19:14   ...(45 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed),
> May 08 03:19:14 org.apache.flink.runtime.JobException: Recovery is 
> suppressed by NoRestartBackoffTimeStrategy
> May 08 03:19:14   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
> May 08 03:19:14   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
> May 08 03:19:14   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
> May 08 03:19:14   ...(35 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed),
> May 08 03:19:14 java.lang.IllegalStateException: This executor has been 
> registered.
> May 08 03:19:14   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> May 08 03:19:14   at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.registerSubtask(ChannelStateWriteRequestExecutorImpl.java:341)
> May 08 03:19:14   at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory.getOrCreateExecutor(ChannelStateWriteRequestExecutorFactory.java:63)
> May 08 03:19:14   ...(17 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed)]
> May 08 03:19:14 to satisfy the given assertions requirements but none did:
> May 08 03:19:14 
> May 08 03:19:14 org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> May 08 03:19:14   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> May 08 03:19:14   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> May 08 03:19:14   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> May 08 03:19:14   ...(45 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed)
> May 08 03:19:14 error: 
> May 08 03:19:14 Expecting throwable message:
> May 08 03:19:14   "Job execution failed."
> May 08 03:19:14 to contain:
> May 08 03:19:14   "successfully restored checkpoint"
> May 08 03:19:14 but did not.
> May 08 03:19:14 
> {noformat}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48772&view=logs&j=fc7981dc-d266-55b0-5fff-f0d0a2294e36&t=1a9b228a-3e0e-598f-fc81-c321539dfdbf&l=7191



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] 1996fanrui merged pull request #22991: [FLINK-32049][checkpoint] Fix thread-safe bug of channel state executor when some subtasks are closed while other subtasks are starting

2023-07-13 Thread via GitHub


1996fanrui merged PR #22991:
URL: https://github.com/apache/flink/pull/22991


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on pull request #22991: [FLINK-32049][checkpoint] Fix thread-safe bug of channel state executor when some subtasks are closed while other subtasks are starting

2023-07-13 Thread via GitHub


1996fanrui commented on PR #22991:
URL: https://github.com/apache/flink/pull/22991#issuecomment-1633970439

   Thanks for the review, merging~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on pull request #22904: [FLINK-32481][table] Support type inference for procedure

2023-07-13 Thread via GitHub


luoyuxia commented on PR #22904:
URL: https://github.com/apache/flink/pull/22904#issuecomment-1633962951

   @LadyForest Thanks for your reviewing. I have addressed your comment in this 
[commit](https://github.com/apache/flink/pull/22904/commits/35a62a159cd1fa45aaa250218f013cd43d8d31d1).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] darenwkt commented on a diff in pull request #620: [FLINK-32317] Enrich metadata in CR error field

2023-07-13 Thread via GitHub


darenwkt commented on code in PR #620:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/620#discussion_r1262342704


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtils.java:
##
@@ -129,26 +144,56 @@ public static Optional 
getSubstringWithMaxLength(String str, int limit)
 private static void enrichMetadata(
 Throwable throwable,
 FlinkResourceException flinkResourceException,
-int lengthThreshold) {
+int lengthThreshold,
+Map labelMapper) {
+if (flinkResourceException.getAdditionalMetadata() == null) {
+flinkResourceException.setAdditionalMetadata(new HashMap<>());
+}
+
 if (throwable instanceof RestClientException) {
-flinkResourceException.setAdditionalMetadata(
-Map.of(
+flinkResourceException
+.getAdditionalMetadata()
+.put(
 "httpResponseCode",
-((RestClientException) 
throwable).getHttpResponseStatus().code()));
+((RestClientException) 
throwable).getHttpResponseStatus().code());
 }
 
 if (throwable instanceof DeploymentFailedException) {
 getSubstringWithMaxLength(
 ((DeploymentFailedException) 
throwable).getReason(), lengthThreshold)
 .ifPresent(
 reason ->
-
flinkResourceException.setAdditionalMetadata(
-Map.of("reason", reason)));
+flinkResourceException
+.getAdditionalMetadata()
+.put("reason", reason));
 }
 
+labelMapper
+.entrySet()
+.forEach(
+(entry) -> {
+Pattern pattern = Pattern.compile(entry.getKey());
+
+org.apache.flink.util.ExceptionUtils.findThrowable(
+throwable, t -> 
pattern.matcher(t.getMessage()).find())

Review Comment:
   Hi Gyula, thanks for catching this, I have fixed the code and added the 
test, thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #22904: [FLINK-32481][table] Support type inference for procedure

2023-07-13 Thread via GitHub


luoyuxia commented on code in PR #22904:
URL: https://github.com/apache/flink/pull/22904#discussion_r1262339392


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionTemplate.java:
##
@@ -133,10 +149,19 @@ private static class DefaultAnnotationHelper {
 // no implementation
 }
 
+@ProcedureHint

Review Comment:
   Thanks for the sugeestion. I like it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] PatrickRen commented on a diff in pull request #22924: [FLINK-32404][table] Add catalog modification listener interface and create listener for catalog manager

2023-07-13 Thread via GitHub


PatrickRen commented on code in PR #22924:
URL: https://github.com/apache/flink/pull/22924#discussion_r1262307415


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/EnvironmentTest.java:
##
@@ -25,14 +25,22 @@
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.listener.CatalogModificationEvent;
+import org.apache.flink.table.catalog.listener.CatalogModificationListener;
+import 
org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory;
 import org.apache.flink.types.Row;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;

Review Comment:
   Nit: we can remove the `public` keyword before all test cases if we update 
the framework to JUnit 5



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/EnvironmentTest.java:
##
@@ -25,14 +25,22 @@
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.listener.CatalogModificationEvent;
+import org.apache.flink.table.catalog.listener.CatalogModificationListener;
+import 
org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory;
 import org.apache.flink.types.Row;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;

Review Comment:
   Please rewrite assertions using AssertJ (`assertThat`) to align with other 
test cases. 



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java:
##
@@ -164,4 +170,30 @@ public static boolean isLegacyConnectorOptions(
 }
 }
 }
+
+/** Find and create modification listener list from configuration. */
+public static List 
findCatalogModificationListenerList(
+final ReadableConfig configuration, final ClassLoader classLoader) 
{

Review Comment:
   I found 4 usages of this method, but the test case only covers one of them 
(the `TableEnvironmentImpl` one). What about adding cases for other usages? I'm 
a bit concern if the `configuration` is passed in correctly end-to-end (like 
user sets them in SQL client session, TableEnv etc.), instead of being 
intercepted by a hidden `new Configuration()` that messes everything up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32588) Flink ML unittest BoundedPerRoundStreamIterationITCase failed

2023-07-13 Thread Jiang Xin (Jira)
Jiang Xin created FLINK-32588:
-

 Summary: Flink ML unittest BoundedPerRoundStreamIterationITCase 
failed
 Key: FLINK-32588
 URL: https://issues.apache.org/jira/browse/FLINK-32588
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Reporter: Jiang Xin
 Fix For: ml-2.4.0


[https://github.com/apache/flink-ml/actions/runs/5306457279/jobs/9604069705]
[https://github.com/apache/flink-ml/actions/runs/5166305530/jobs/9306327867]

 

The error message is as below.
{code:java}
Error:  testPerRoundIterationWithState  Time elapsed: 7.192 s  <<< FAILURE!
620java.lang.AssertionError: expected:<3> but was:<4>
621 at org.junit.Assert.fail(Assert.java:89)
622 at org.junit.Assert.failNotEquals(Assert.java:835)
623 at org.junit.Assert.assertEquals(Assert.java:647)
624 at org.junit.Assert.assertEquals(Assert.java:633)
625 at 
org.apache.flink.test.iteration.BoundedPerRoundStreamIterationITCase.testPerRoundIterationWithState(BoundedPerRoundStreamIterationITCase.java:170)
626 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
627 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
628 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
629 at java.lang.reflect.Method.invoke(Method.java:498) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] LadyForest commented on a diff in pull request #22904: [FLINK-32481][table] Support type inference for procedure

2023-07-13 Thread via GitHub


LadyForest commented on code in PR #22904:
URL: https://github.com/apache/flink/pull/22904#discussion_r1262153293


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionTemplate.java:
##
@@ -146,6 +171,15 @@ private static  T defaultAsNull(FunctionHint hint, 
Function
 return actualValue;
 }
 
+private static  T defaultAsNull(ProcedureHint hint, 
Function accessor) {
+final T defaultValue = 
accessor.apply(getDefaultProcedureHintAnnotation());
+final T actualValue = accessor.apply(hint);
+if (Objects.deepEquals(defaultValue, actualValue)) {
+return null;
+}
+return actualValue;
+}
+

Review Comment:
   Nit: What about
   ```java
   private static  T defaultAsNull(FunctionHint hint, 
Function accessor) {
   return defaultAsNull(hint, DEFAULT_ANNOTATION, accessor);
   }
   
   private static  T defaultAsNull(ProcedureHint hint, 
Function accessor) {
   return defaultAsNull(hint, getDefaultProcedureHintAnnotation(), 
accessor);
   }
   
   private static  T defaultAsNull(H hint, H defaultHint, Function accessor) {
   final T defaultValue = accessor.apply(defaultHint);
   final T actualValue = accessor.apply(hint);
   if (Objects.deepEquals(defaultValue, actualValue)) {
   return null;
   }
   return actualValue;
   }
   ``



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32481) Support type inference for procedure

2023-07-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32481:
---
Labels: pull-request-available  (was: )

> Support type inference for procedure
> 
>
> Key: FLINK-32481
> URL: https://issues.apache.org/jira/browse/FLINK-32481
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Runtime
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
>  Labels: pull-request-available
>
> Currently, FunctionMappingExtractor can only handle the type inference for 
> procedure. We can extend it to make it can also handle procedure. Since 
> procedure is much similar to function, we can resue the stack/code of 
> {{{}FunctionMappingExtractor{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] LadyForest commented on a diff in pull request #22904: [FLINK-32481][table] Support type inference for procedure

2023-07-13 Thread via GitHub


LadyForest commented on code in PR #22904:
URL: https://github.com/apache/flink/pull/22904#discussion_r1259336884


##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java:
##
@@ -577,6 +839,18 @@ static TestSpec forTableAggregateFunction(
 new DataTypeFactoryMock(), function));
 }
 
+static TestSpec forProcedure(Class procedure) {
+return forProcedure(null, procedure);
+}
+
+static TestSpec forProcedure(String description, Class procedure) {

Review Comment:
   Nit: add `@Nullable` annotation



##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java:
##
@@ -927,4 +1201,238 @@ private static class DataTypeHintOnScalarFunction 
extends ScalarFunction {
 return null;
 }
 }
+
+@ProcedureHint(
+input = {@DataTypeHint("INT"), @DataTypeHint("STRING")},
+argumentNames = {"i", "s"},
+output = @DataTypeHint("BOOLEAN"))
+private static class FullProcedureHint implements Procedure {
+public Boolean[] call(Object procedureContext, Integer i, String s) {
+return null;
+}
+}
+
+private static class ComplexProcedureHint implements Procedure {
+@ProcedureHint(
+input = {@DataTypeHint("ARRAY"), @DataTypeHint(inputGroup 
= InputGroup.ANY)},
+argumentNames = {"myInt", "myAny"},
+output = @DataTypeHint("BOOLEAN"),
+isVarArgs = true)
+public Boolean[] call(Object procedureContext, Object... o) {
+return null;
+}
+}
+
+@ProcedureHint(input = @DataTypeHint("INT"), output = @DataTypeHint("INT"))
+@ProcedureHint(input = @DataTypeHint("BIGINT"), output = 
@DataTypeHint("BIGINT"))
+private static class FullProcedureHints implements Procedure {
+public Number[] call(Object procedureContext, Number n) {
+return null;
+}
+}
+
+@ProcedureHint(output = @DataTypeHint("INT"))
+private static class GlobalOutputProcedureHint implements Procedure {
+@ProcedureHint(input = @DataTypeHint("INT"))
+public Integer[] call(Object procedureContext, Integer n) {
+return null;
+}
+
+@ProcedureHint(input = @DataTypeHint("STRING"))
+public Integer[] call(Object procedureContext, String n) {
+return null;
+}
+}
+
+@ProcedureHint(output = @DataTypeHint("INT"))
+private static class InvalidSingleOutputProcedureHint implements Procedure 
{
+@ProcedureHint(output = @DataTypeHint("TINYINT"))
+public Integer call(Object procedureContext, Number n) {
+return null;
+}
+}
+
+@ProcedureHint(input = @DataTypeHint("INT"), output = @DataTypeHint("INT"))
+private static class SplitFullProcedureHints implements Procedure {
+@ProcedureHint(input = @DataTypeHint("BIGINT"), output = 
@DataTypeHint("BIGINT"))
+public Number[] call(Object procedureContext, Number n) {
+return null;
+}
+}
+
+@ProcedureHint(input = @DataTypeHint("INT"), output = @DataTypeHint("INT"))
+private static class InvalidFullOutputProcedureHint implements Procedure {
+@ProcedureHint(input = @DataTypeHint("INT"), output = 
@DataTypeHint("BIGINT"))
+public Number[] call(Object procedureContext, Integer i) {
+return null;
+}
+}
+
+@ProcedureHint(input = @DataTypeHint("INT"), argumentNames = "a", output = 
@DataTypeHint("INT"))
+private static class InvalidFullOutputProcedureWithArgNamesHint implements 
Procedure {
+@ProcedureHint(
+input = @DataTypeHint("INT"),
+argumentNames = "b",
+output = @DataTypeHint("BIGINT"))
+public Number[] call(Object procedureContext, Integer i) {
+return null;
+}
+}
+
+@ProcedureHint(input = @DataTypeHint("INT"))
+private static class InvalidLocalOutputProcedureHint implements Procedure {
+@ProcedureHint(output = @DataTypeHint("INT"))
+public Integer[] call(Object procedureContext, Integer n) {
+return null;
+}
+
+@ProcedureHint(output = @DataTypeHint("STRING"))
+public Integer[] call(Object procedureContext, String n) {
+return null;
+}
+}
+
+@ProcedureHint(
+input = {@DataTypeHint("INT"), @DataTypeHint()},
+output = @DataTypeHint("BOOLEAN"))
+private static class IncompleteProcedureHint implements Procedure {
+public Boolean[] call(Object procedureContext, Integer i1, Integer i2) 
{
+return null;
+}
+}
+
+@ProcedureHint(input = @DataTypeHint("INT"))
+@ProcedureHint(input = @DataTypeHint("BIGINT"))
+private static 

[jira] [Created] (FLINK-32587) The results returned from the CDC sql query are null or the value was changed unexpectly

2023-07-13 Thread jasonliangyc (Jira)
jasonliangyc created FLINK-32587:


 Summary: The results returned from the CDC sql query are null or 
the value was changed unexpectly
 Key: FLINK-32587
 URL: https://issues.apache.org/jira/browse/FLINK-32587
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.17.1, 1.17.0
Reporter: jasonliangyc
 Attachments: image-2023-07-13-17-35-32-235.png, 
image-2023-07-13-17-37-56-908.png

I created a CDC table as below and then run the query 'select * from so_cdc' 
through sql-client, it gives me the unexpected results.
{code:java}
CREATE TABLE so_cdc (
   REC_ID STRING,
   Create_Date TIMESTAMP(3),
   PRIMARY KEY (REC_ID) NOT ENFORCED
 ) WITH (
    'connector' = 'sqlserver-cdc',
    'hostname' = '',
    'port' = '',
    'username' = 'xxx',
    'password' = '',
    'database-name' = '',
    'schema-name' = '',
    'table-name' = 'xxx',
    'scan.startup.mode' = 'latest-offset'
 ); {code}
Run the query for the first time, the data look normal.

!image-2023-07-13-17-35-32-235.png|width=535,height=141!

 

But after i run the same query multiple times, it gives me the unexpected data, 
and i'm sure that these two columns of my cdc source table don't contain these 
data

!image-2023-07-13-17-37-56-908.png|width=469,height=175!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] PatrickRen commented on a diff in pull request #22869: [FLINK-32403][table] Add database related operations in CatalogManager

2023-07-13 Thread via GitHub


PatrickRen commented on code in PR #22869:
URL: https://github.com/apache/flink/pull/22869#discussion_r1262279404


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterDatabaseOperation.java:
##
@@ -70,10 +69,13 @@ public String asSummaryString() {
 
 @Override
 public TableResultInternal execute(Context ctx) {
-Catalog catalog = 
ctx.getCatalogManager().getCatalogOrThrowException(getCatalogName());
 try {
-catalog.alterDatabase(getDatabaseName(), getCatalogDatabase(), 
false);
+ctx.getCatalogManager()
+.alterDatabase(
+getCatalogName(), getDatabaseName(), 
getCatalogDatabase(), false);
 return TableResultImpl.TABLE_RESULT_OK;
+} catch (ValidationException e) {
+throw e;

Review Comment:
   Do we need to catch the `ValidationException` if we just re-throw it without 
any additional action? Same question as in `CreateDatabaseOperation` and 
`DropDatabaseOperation`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22992: [hotfix] Fix typo in JobManagerOptions

2023-07-13 Thread via GitHub


flinkbot commented on PR #22992:
URL: https://github.com/apache/flink/pull/22992#issuecomment-1633890956

   
   ## CI report:
   
   * 72e8439ce49f3ebfe275a653d6ed41b6977de0ee UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32586) Enable input locality in SimpleExecutionSlotAllocator

2023-07-13 Thread Lijie Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742730#comment-17742730
 ] 

Lijie Wang commented on FLINK-32586:


Thanks your proposal, make sense to me, assigned to you :) [~xiasun]

> Enable input locality in SimpleExecutionSlotAllocator
> -
>
> Key: FLINK-32586
> URL: https://issues.apache.org/jira/browse/FLINK-32586
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: xingbe
>Assignee: xingbe
>Priority: Major
> Fix For: 1.18.0
>
>
> At present, the AdaptiveBatchScheduler uses the 
> `SimpleExecutionSlotAllocator` to assign slot to execution, but it currently 
> lacks support for the capability of input locality, which may increase 
> unnecessary data transmission overhead. In this issue, we aim to enable the 
> `SimpleExecutionSlotAllocator` to support the input locality.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32586) Enable input locality in SimpleExecutionSlotAllocator

2023-07-13 Thread Lijie Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lijie Wang reassigned FLINK-32586:
--

Assignee: xingbe

> Enable input locality in SimpleExecutionSlotAllocator
> -
>
> Key: FLINK-32586
> URL: https://issues.apache.org/jira/browse/FLINK-32586
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: xingbe
>Assignee: xingbe
>Priority: Major
> Fix For: 1.18.0
>
>
> At present, the AdaptiveBatchScheduler uses the 
> `SimpleExecutionSlotAllocator` to assign slot to execution, but it currently 
> lacks support for the capability of input locality, which may increase 
> unnecessary data transmission overhead. In this issue, we aim to enable the 
> `SimpleExecutionSlotAllocator` to support the input locality.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32577) Avoid memory fragmentation when running CI for flink-table-planner module

2023-07-13 Thread Yunhong Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yunhong Zheng updated FLINK-32577:
--
Description: 
This issue is a sub-issue of FLINK-18356.

 

 

  was:This issue is a sub-issue of FLINK-18356.


> Avoid memory fragmentation when running CI for flink-table-planner module
> -
>
> Key: FLINK-32577
> URL: https://issues.apache.org/jira/browse/FLINK-32577
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System / CI, Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Yunhong Zheng
>Priority: Major
> Fix For: 1.18.0
>
>
> This issue is a sub-issue of FLINK-18356.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] jiaoqingbo opened a new pull request, #22992: [hotfix] Fix typo in JobManagerOptions

2023-07-13 Thread via GitHub


jiaoqingbo opened a new pull request, #22992:
URL: https://github.com/apache/flink/pull/22992

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect

2023-07-13 Thread Yunhong Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742728#comment-17742728
 ] 

Yunhong Zheng commented on FLINK-32579:
---

Hi [~jasonliangyc] . Are you using Flink version 1.17.0?  I am unable to 
reproduce the problem in Figure 1 as you shown in my local UT case.  My sql 
pattern is:
{code:java}
SELECT * FROM MyTable AS T LEFT JOIN LookupTable FOR SYSTEM_TIME AS OF 
T.proctime AS D ON T.b = '?' and T.a = D.id {code}
My rel plan result is:

!image-2023-07-13-17-19-26-972.png|width=708,height=49!

 

BTW, I don't quite understand what is the problem in your Figure 2? Can you 
clarify it. Thanks!

> The filter criteria on the lookup table of Lookup join has no effect 
> -
>
> Key: FLINK-32579
> URL: https://issues.apache.org/jira/browse/FLINK-32579
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0, 1.17.1
>Reporter: jasonliangyc
>Priority: Major
> Attachments: image-2023-07-12-09-31-18-261.png, 
> image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, 
> image-2023-07-13-17-19-26-972.png
>
>
> *1.* I joined two tables using the lookup join as below query in sql-client, 
> the filter criteria of (p.name = '??') didn't shows up in the execution 
> detail and it returned the rows only base on one condiction (cdc.product_id = 
> p.id)
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and 
> cdc.product_id = p.id
> ; {code}
> !image-2023-07-12-09-31-18-261.png|width=657,height=132!
>  
> *2.* It showed the werid results when i changed the query as below, cause 
> there were no data in the table(products) that the value of column 'name' is 
> '??'  and and execution detail didn't show us the where criteria.
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id
> where p.name = '??'
> ; {code}
> !image-2023-07-12-09-42-59-231.png|width=684,height=102!
> !image-2023-07-12-09-47-31-397.png|width=685,height=120!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect

2023-07-13 Thread Yunhong Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yunhong Zheng updated FLINK-32579:
--
Attachment: image-2023-07-13-17-19-26-972.png

> The filter criteria on the lookup table of Lookup join has no effect 
> -
>
> Key: FLINK-32579
> URL: https://issues.apache.org/jira/browse/FLINK-32579
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0, 1.17.1
>Reporter: jasonliangyc
>Priority: Major
> Attachments: image-2023-07-12-09-31-18-261.png, 
> image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png, 
> image-2023-07-13-17-19-26-972.png
>
>
> *1.* I joined two tables using the lookup join as below query in sql-client, 
> the filter criteria of (p.name = '??') didn't shows up in the execution 
> detail and it returned the rows only base on one condiction (cdc.product_id = 
> p.id)
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and 
> cdc.product_id = p.id
> ; {code}
> !image-2023-07-12-09-31-18-261.png|width=657,height=132!
>  
> *2.* It showed the werid results when i changed the query as below, cause 
> there were no data in the table(products) that the value of column 'name' is 
> '??'  and and execution detail didn't show us the where criteria.
> {code:java}
> select
> cdc.order_id,
> cdc.order_date,
> cdc.customer_name,
> cdc.price,
> p.name
> FROM orders AS cdc
> left JOIN products 
> FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id
> where p.name = '??'
> ; {code}
> !image-2023-07-12-09-42-59-231.png|width=684,height=102!
> !image-2023-07-12-09-47-31-397.png|width=685,height=120!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32586) Enable input locality in SimpleExecutionSlotAllocator

2023-07-13 Thread xingbe (Jira)
xingbe created FLINK-32586:
--

 Summary: Enable input locality in SimpleExecutionSlotAllocator
 Key: FLINK-32586
 URL: https://issues.apache.org/jira/browse/FLINK-32586
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.18.0
Reporter: xingbe
 Fix For: 1.18.0


At present, the AdaptiveBatchScheduler uses the `SimpleExecutionSlotAllocator` 
to assign slot to execution, but it currently lacks support for the capability 
of input locality, which may increase unnecessary data transmission overhead. 
In this issue, we aim to enable the `SimpleExecutionSlotAllocator` to support 
the input locality.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32049) CoordinatedSourceRescaleITCase.testDownscaling fails on AZP

2023-07-13 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742689#comment-17742689
 ] 

Rui Fan commented on FLINK-32049:
-

Merged via:
 bc4c21e47040360aab5bcb0f2c18b907b60e7838

> CoordinatedSourceRescaleITCase.testDownscaling fails on AZP
> ---
>
> Key: FLINK-32049
> URL: https://issues.apache.org/jira/browse/FLINK-32049
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Sergey Nuyanzin
>Assignee: Qingsheng Ren
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Attachments: logs-cron_azure-test_cron_azure_connect_2-1686196685.zip
>
>
> CoordinatedSourceRescaleITCase.testDownscaling fails with
> {noformat}
> May 08 03:19:14 [ERROR] Failures: 
> May 08 03:19:14 [ERROR]   
> CoordinatedSourceRescaleITCase.testDownscaling:75->resumeCheckpoint:107 
> May 08 03:19:14 Multiple Failures (1 failure)
> May 08 03:19:14 -- failure 1 --
> May 08 03:19:14 [Any cause contains message 'successfully restored 
> checkpoint'] 
> May 08 03:19:14 Expecting any element of:
> May 08 03:19:14   [org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> May 08 03:19:14   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> May 08 03:19:14   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> May 08 03:19:14   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> May 08 03:19:14   ...(45 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed),
> May 08 03:19:14 org.apache.flink.runtime.JobException: Recovery is 
> suppressed by NoRestartBackoffTimeStrategy
> May 08 03:19:14   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
> May 08 03:19:14   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
> May 08 03:19:14   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
> May 08 03:19:14   ...(35 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed),
> May 08 03:19:14 java.lang.IllegalStateException: This executor has been 
> registered.
> May 08 03:19:14   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> May 08 03:19:14   at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.registerSubtask(ChannelStateWriteRequestExecutorImpl.java:341)
> May 08 03:19:14   at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory.getOrCreateExecutor(ChannelStateWriteRequestExecutorFactory.java:63)
> May 08 03:19:14   ...(17 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed)]
> May 08 03:19:14 to satisfy the given assertions requirements but none did:
> May 08 03:19:14 
> May 08 03:19:14 org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> May 08 03:19:14   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> May 08 03:19:14   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> May 08 03:19:14   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> May 08 03:19:14   ...(45 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed)
> May 08 03:19:14 error: 
> May 08 03:19:14 Expecting throwable message:
> May 08 03:19:14   "Job execution failed."
> May 08 03:19:14 to contain:
> May 08 03:19:14   "successfully restored checkpoint"
> May 08 03:19:14 but did not.
> May 08 03:19:14 
> {noformat}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48772&view=logs&j=fc7981dc-d266-55b0-5fff-f0d0a2294e36&t=1a9b228a-3e0e-598f-fc81-c321539dfdbf&l=7191



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >