[GitHub] [flink] flinkbot commented on pull request #22878: [hotfix] [flink-parquet] Print stacktrace when reporting statistics failed

2023-06-26 Thread via GitHub


flinkbot commented on PR #22878:
URL: https://github.com/apache/flink/pull/22878#issuecomment-1608903404

   
   ## CI report:
   
   * dc4b8748c6ef7a58d50311fbc1335b5921dc3efb UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] jiaoqingbo commented on pull request #22878: [hotfix] [flink-parquet] Print stacktrace when reporting statistics failed

2023-06-26 Thread via GitHub


jiaoqingbo commented on PR #22878:
URL: https://github.com/apache/flink/pull/22878#issuecomment-1608894779

   @wanglijie95 very minor changes, please cc


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] jiaoqingbo opened a new pull request, #22878: [hotfix] [flink-parquet] Print stacktrace when reporting statistics failed

2023-06-26 Thread via GitHub


jiaoqingbo opened a new pull request, #22878:
URL: https://github.com/apache/flink/pull/22878

   
   
   
   ## What is the purpose of the change
   
   Print stacktrace when reporting statistics failed
   
   
   ## Brief change log
   
   Print stacktrace when reporting statistics failed
   
   
   ## Verifying this change
   
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22877: Bump socket.io-parser and socket.io in /flink-runtime-web/web-dashboard

2023-06-26 Thread via GitHub


flinkbot commented on PR #22877:
URL: https://github.com/apache/flink/pull/22877#issuecomment-160623

   
   ## CI report:
   
   * d6b1bb85ba0d7f322e99ac87ab6c9f5a0d913e0d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30859) Remove flink-connector-kafka from master branch

2023-06-26 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737472#comment-17737472
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-30859:
-

Merged via apache/flink:149a5e34c1ed8d8943c901a98c65c70693915811

> Remove flink-connector-kafka from master branch
> ---
>
> Key: FLINK-30859
> URL: https://issues.apache.org/jira/browse/FLINK-30859
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
>
> Remove flink-connector-kafka from master branch since the repo has now been 
> externalized and 1.17 commits have been sync'ed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30859) Remove flink-connector-kafka from master branch

2023-06-26 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-30859.
---
Fix Version/s: 1.18.0
   Resolution: Fixed

> Remove flink-connector-kafka from master branch
> ---
>
> Key: FLINK-30859
> URL: https://issues.apache.org/jira/browse/FLINK-30859
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Remove flink-connector-kafka from master branch since the repo has now been 
> externalized and 1.17 commits have been sync'ed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] tzulitai commented on pull request #22797: [FLINK-30859] Remove all Kafka connector code from main repo

2023-06-26 Thread via GitHub


tzulitai commented on PR #22797:
URL: https://github.com/apache/flink/pull/22797#issuecomment-1608881906

   Merged via 149a5e34c1ed8d8943c901a98c65c70693915811


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dependabot[bot] closed pull request #22631: Bump socket.io-parser and socket.io in /flink-runtime-web/web-dashboard

2023-06-26 Thread via GitHub


dependabot[bot] closed pull request #22631: Bump socket.io-parser and socket.io 
in /flink-runtime-web/web-dashboard
URL: https://github.com/apache/flink/pull/22631


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dependabot[bot] commented on pull request #22631: Bump socket.io-parser and socket.io in /flink-runtime-web/web-dashboard

2023-06-26 Thread via GitHub


dependabot[bot] commented on PR #22631:
URL: https://github.com/apache/flink/pull/22631#issuecomment-1608881623

   Superseded by #22877.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dependabot[bot] opened a new pull request, #22877: Bump socket.io-parser and socket.io in /flink-runtime-web/web-dashboard

2023-06-26 Thread via GitHub


dependabot[bot] opened a new pull request, #22877:
URL: https://github.com/apache/flink/pull/22877

   Bumps [socket.io-parser](https://github.com/socketio/socket.io-parser) and 
[socket.io](https://github.com/socketio/socket.io). These dependencies needed 
to be updated together.
   Updates `socket.io-parser` from 4.0.5 to 4.2.4
   
   Release notes
   Sourced from https://github.com/socketio/socket.io-parser/releases";>socket.io-parser's 
releases.
   
   4.2.4
   Bug Fixes
   
   ensure reserved events cannot be used as event names (https://github.com/socketio/socket.io-parser/commit/d9db4737a3c8ce5f1f49ecc8d928a74f3da591f7";>d9db473)
   properly detect plain objects (https://github.com/socketio/socket.io-parser/commit/b0e6400c93b5c4aa25e6a629d6448b8627275213";>b0e6400)
   
   Links
   
   Diff: https://github.com/socketio/socket.io-parser/compare/4.2.3...4.2.4";>https://github.com/socketio/socket.io-parser/compare/4.2.3...4.2.4
   
   4.2.3
   :warning: This release contains an important security fix :warning:
   A malicious client could send a specially crafted HTTP request, 
triggering an uncaught exception and killing the Node.js process:
   TypeError: Cannot convert object to primitive value
  at Socket.emit (node:events:507:25)
  at .../node_modules/socket.io/lib/socket.js:531:14
   
   Please upgrade as soon as possible.
   Bug Fixes
   
   check the format of the event name (https://github.com/socketio/socket.io-parser/commit/3b78117bf6ba7e99d7a5cfc1ba54d0477554a7f3";>3b78117)
   
   Links
   
   Diff: https://github.com/socketio/socket.io-parser/compare/4.2.2...4.2.3";>https://github.com/socketio/socket.io-parser/compare/4.2.2...4.2.3
   
   4.2.2
   Bug Fixes
   
   calling destroy() should clear all internal state (https://github.com/socketio/socket.io-parser/commit/22c42e3545e4adbc5931276c378f5d62c8b3854a";>22c42e3)
   do not modify the input packet upon encoding (https://github.com/socketio/socket.io-parser/commit/ae8dd88995dbd7f89c97e5cc15e5b489fa0efece";>ae8dd88)
   
   Links
   
   Diff: https://github.com/socketio/socket.io-parser/compare/4.2.1...4.2.2";>https://github.com/socketio/socket.io-parser/compare/4.2.1...4.2.2
   
   4.2.1
   Bug Fixes
   
   check the format of the index of each attachment (https://github.com/socketio/socket.io-parser/commit/b5d0cb7dc56a0601a09b056beaeeb0e43b160050";>b5d0cb7)
   
   Links
   
   Diff: https://github.com/socketio/socket.io-parser/compare/4.2.0...4.2.1";>https://github.com/socketio/socket.io-parser/compare/4.2.0...4.2.1
   
   
   
   ... (truncated)
   
   
   Changelog
   Sourced from https://github.com/socketio/socket.io-parser/blob/main/CHANGELOG.md";>socket.io-parser's
 changelog.
   
   https://github.com/socketio/socket.io-parser/compare/4.2.3...4.2.4";>4.2.4
 (2023-05-31)
   Bug Fixes
   
   ensure reserved events cannot be used as event names (https://github.com/socketio/socket.io-parser/commit/d9db4737a3c8ce5f1f49ecc8d928a74f3da591f7";>d9db473)
   properly detect plain objects (https://github.com/socketio/socket.io-parser/commit/b0e6400c93b5c4aa25e6a629d6448b8627275213";>b0e6400)
   
   https://github.com/socketio/socket.io-parser/compare/3.4.2...3.4.3";>3.4.3
 (2023-05-22)
   Bug Fixes
   
   check the format of the event name (https://github.com/socketio/socket.io-parser/commit/2dc3c92622dad113b8676be06f23b1ed46b02ced";>2dc3c92)
   
   https://github.com/socketio/socket.io-parser/compare/4.2.2...4.2.3";>4.2.3
 (2023-05-22)
   Bug Fixes
   
   check the format of the event name (https://github.com/socketio/socket.io-parser/commit/3b78117bf6ba7e99d7a5cfc1ba54d0477554a7f3";>3b78117)
   
   https://github.com/socketio/socket.io-parser/compare/4.2.1...4.2.2";>4.2.2
 (2023-01-19)
   Bug Fixes
   
   calling destroy() should clear all internal state (https://github.com/socketio/socket.io-parser/commit/22c42e3545e4adbc5931276c378f5d62c8b3854a";>22c42e3)
   do not modify the input packet upon encoding (https://github.com/socketio/socket.io-parser/commit/ae8dd88995dbd7f89c97e5cc15e5b489fa0efece";>ae8dd88)
   
   https://github.com/Automattic/socket.io-parser/compare/3.3.2...3.3.3";>3.3.3
 (2022-11-09)
   Bug Fixes
   
   check the format of the index of each attachment (https://github.com/Automattic/socket.io-parser/commit/fb21e422fc193b34347395a33e0f625bebc09983";>fb21e42)
   
   https://github.com/socketio/socket.io-parser/compare/3.4.1...3.4.2";>3.4.2
 (2022-11-09)
   
   
   ... (truncated)
   
   
   Commits
   
   https://github.com/socketio/socket.io-parser/commit/164ba2a11edc34c2f363401e9768f9a8541a8b89";>164ba2a
 chore(release): 4.2.4
   https://github.com/socketio/socket.io-parser/commit/b0e6400c93b5c4aa25e6a629d6448b8627275213";>b0e6400
 fix: properly detect plain objects
   https://github.com/socketio/socket.io-parser/commit/d9db4737a3c8ce5f1f49ecc8d928a74f3da591f7";>d9db473
 fix: ensure reserved events cannot be used as event names
   https://github.com/socketio/socket.io-parser/commit/6a5a004d1e1fd7b7250fdc6fb1

[jira] [Commented] (FLINK-32370) JDBC SQl gateway e2e test is unstable

2023-06-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737469#comment-17737469
 ] 

Sergey Nuyanzin commented on FLINK-32370:
-

[~zjureel] , [~libenchao] could you please have a look here?

> JDBC SQl gateway e2e test is unstable
> -
>
> Key: FLINK-32370
> URL: https://issues.apache.org/jira/browse/FLINK-32370
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Assignee: Fang Yong
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0
>
> Attachments: flink-vsts-sql-gateway-0-fv-az75-650.log, 
> flink-vsts-standalonesession-0-fv-az75-650.log, 
> flink-vsts-taskexecutor-0-fv-az75-650.log
>
>
> The client is failing while trying to collect data when the job already 
> finished on the cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32444) Enable object reuse for Flink SQL jobs by default

2023-06-26 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737470#comment-17737470
 ] 

Jark Wu commented on FLINK-32444:
-

cc [~lincoln.86xy], [~lsy], [~twalthr] what do you think?

> Enable object reuse for Flink SQL jobs by default
> -
>
> Key: FLINK-32444
> URL: https://issues.apache.org/jira/browse/FLINK-32444
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Jark Wu
>Priority: Major
> Fix For: 1.18.0
>
>
> Currently, object reuse is not enabled by default for Flink Streaming Jobs, 
> but is enabled by default for Flink Batch jobs. That is not consistent for 
> stream-batch unification. Besides, SQL operators are safe to enable object 
> reuse and this is a great performance improvement for SQL jobs. 
> We should also be careful with the Table-DataStream conversion case 
> (StreamTableEnvironment) which is not safe to enable object reuse by default. 
> Maybe we can just enable it for SQL Client/Gateway and TableEnvironment. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32370) JDBC SQl gateway e2e test is unstable

2023-06-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737468#comment-17737468
 ] 

Sergey Nuyanzin commented on FLINK-32370:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50467&view=logs&j=fb37c667-81b7-5c22-dd91-846535e99a97&t=011e961e-597c-5c96-04fe-7941c8b83f23&l=3673

> JDBC SQl gateway e2e test is unstable
> -
>
> Key: FLINK-32370
> URL: https://issues.apache.org/jira/browse/FLINK-32370
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Assignee: Fang Yong
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0
>
> Attachments: flink-vsts-sql-gateway-0-fv-az75-650.log, 
> flink-vsts-standalonesession-0-fv-az75-650.log, 
> flink-vsts-taskexecutor-0-fv-az75-650.log
>
>
> The client is failing while trying to collect data when the job already 
> finished on the cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32444) Enable object reuse for Flink SQL jobs by default

2023-06-26 Thread Jark Wu (Jira)
Jark Wu created FLINK-32444:
---

 Summary: Enable object reuse for Flink SQL jobs by default
 Key: FLINK-32444
 URL: https://issues.apache.org/jira/browse/FLINK-32444
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Reporter: Jark Wu
 Fix For: 1.18.0


Currently, object reuse is not enabled by default for Flink Streaming Jobs, but 
is enabled by default for Flink Batch jobs. That is not consistent for 
stream-batch unification. Besides, SQL operators are safe to enable object 
reuse and this is a great performance improvement for SQL jobs. 

We should also be careful with the Table-DataStream conversion case 
(StreamTableEnvironment) which is not safe to enable object reuse by default. 
Maybe we can just enable it for SQL Client/Gateway and TableEnvironment. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32441) DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore fails with timeout on AZP

2023-06-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737466#comment-17737466
 ] 

Sergey Nuyanzin commented on FLINK-32441:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50467&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=8576

> DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore fails with 
> timeout on AZP
> --
>
> Key: FLINK-32441
> URL: https://issues.apache.org/jira/browse/FLINK-32441
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core, Tests
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Priority: Critical
>  Labels: test-stability
>
> This build 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50461&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=9274
> fails with timeout on 
> {{DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30774) Introduce flink-utils module

2023-06-26 Thread Alokh P (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737462#comment-17737462
 ] 

Alokh P commented on FLINK-30774:
-

Hi folks. Would love to work on this in case we want to go ahead with this.

> Introduce flink-utils module
> 
>
> Key: FLINK-30774
> URL: https://issues.apache.org/jira/browse/FLINK-30774
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: starter
>
> Currently, utility methods generic utility classes like {{Preconditions}} or 
> {{AbstractAutoCloseableRegistry}} are collected in {{flink-core}}. The flaw 
> of this approach is that we cannot use those classes in modules like 
> {{fink-migration-test-utils}}, {{flink-test-utils-junit}}, 
> {{flink-metrics-core}} or {{flink-annotations}}.
> We might want to have a generic {{flink-utils}} analogously to 
> {{flink-test-utils}} that collects Flink-independent utility functionality 
> that can be access by any module {{flink-core}} is depending on to make this 
> utility functionality available in any Flink-related module.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30629) ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable

2023-06-26 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-30629:

Fix Version/s: 1.18.0
   1.17.2
   (was: 1.17.0)

> ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable
> -
>
> Key: FLINK-30629
> URL: https://issues.apache.org/jira/browse/FLINK-30629
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.17.0, 1.18.0
>Reporter: Xintong Song
>Assignee: Liu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0, 1.17.2
>
> Attachments: ClientHeartbeatTestLog.txt, 
> logs-cron_azure-test_cron_azure_core-1685497478.zip
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44690&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=125e07e7-8de0-5c6c-a541-a567415af3ef&l=10819
> {code:java}
> Jan 11 04:32:39 [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 21.02 s <<< FAILURE! - in 
> org.apache.flink.client.ClientHeartbeatTest
> Jan 11 04:32:39 [ERROR] 
> org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat
>   Time elapsed: 9.157 s  <<< ERROR!
> Jan 11 04:32:39 java.lang.IllegalStateException: MiniCluster is not yet 
> running or has already been shut down.
> Jan 11 04:32:39   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1044)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:917)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:841)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91)
> Jan 11 04:32:39   at 
> org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat(ClientHeartbeatTest.java:79)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-30629) ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable

2023-06-26 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin resolved FLINK-30629.
-
Resolution: Fixed

Thanks [~Jiangang] 
the pr merged, a couple of weeks ago and there is no new cases so far
closing the issue

> ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable
> -
>
> Key: FLINK-30629
> URL: https://issues.apache.org/jira/browse/FLINK-30629
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.17.0, 1.18.0
>Reporter: Xintong Song
>Assignee: Liu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.17.0
>
> Attachments: ClientHeartbeatTestLog.txt, 
> logs-cron_azure-test_cron_azure_core-1685497478.zip
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44690&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=125e07e7-8de0-5c6c-a541-a567415af3ef&l=10819
> {code:java}
> Jan 11 04:32:39 [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 21.02 s <<< FAILURE! - in 
> org.apache.flink.client.ClientHeartbeatTest
> Jan 11 04:32:39 [ERROR] 
> org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat
>   Time elapsed: 9.157 s  <<< ERROR!
> Jan 11 04:32:39 java.lang.IllegalStateException: MiniCluster is not yet 
> running or has already been shut down.
> Jan 11 04:32:39   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1044)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:917)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:841)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91)
> Jan 11 04:32:39   at 
> org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat(ClientHeartbeatTest.java:79)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30629) ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable

2023-06-26 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin reassigned FLINK-30629:
---

Assignee: Liu  (was: Weijie Guo)

> ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable
> -
>
> Key: FLINK-30629
> URL: https://issues.apache.org/jira/browse/FLINK-30629
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.17.0, 1.18.0
>Reporter: Xintong Song
>Assignee: Liu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.17.0
>
> Attachments: ClientHeartbeatTestLog.txt, 
> logs-cron_azure-test_cron_azure_core-1685497478.zip
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44690&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=125e07e7-8de0-5c6c-a541-a567415af3ef&l=10819
> {code:java}
> Jan 11 04:32:39 [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 21.02 s <<< FAILURE! - in 
> org.apache.flink.client.ClientHeartbeatTest
> Jan 11 04:32:39 [ERROR] 
> org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat
>   Time elapsed: 9.157 s  <<< ERROR!
> Jan 11 04:32:39 java.lang.IllegalStateException: MiniCluster is not yet 
> running or has already been shut down.
> Jan 11 04:32:39   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1044)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:917)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:841)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91)
> Jan 11 04:32:39   at 
> org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat(ClientHeartbeatTest.java:79)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] snuyanzin commented on pull request #22834: [FLINK-32260]-table-Add-ARRAY_SLICE-function

2023-06-26 Thread via GitHub


snuyanzin commented on PR #22834:
URL: https://github.com/apache/flink/pull/22834#issuecomment-1608860964

   This is not complete e2e test
   
   to reproduce: 
   1. build the project e.g. `mvn clean install -DskipTests` from the project 
root folder
   2. got to build-target  `cd build-target`
   3. be sure that there is no active cluster `bin/stop-cluster.sh`
   should return something like 
   ```
   No taskexecutor daemon to stop on host .
   No standalonesession daemon to stop on host .
   ```
   3. start the cluster `bin/start-cluster.sh`
   4. open sql client `bin/sql-client.sh`
   5. execute the query above
   there is an exception like 
   ```
   Caused by: org.apache.flink.table.api.TableException: Column 'EXPR$0' is NOT 
NULL, however, a null value is being written into it. You can set job 
configuration 'table.exec.sink.not-null-enforcer'='DROP' to suppress this 
exception and drop such records silently.
at 
org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processNotNullConstraint(ConstraintEnforcer.java:261)
at 
org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:241)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at StreamExecCalc$23.processElement(Unknown Source)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wangyang0918 commented on a diff in pull request #22509: [FLINK-31983] Add yarn Acls capability to Flink containers

2023-06-26 Thread via GitHub


wangyang0918 commented on code in PR #22509:
URL: https://github.com/apache/flink/pull/22509#discussion_r1243187163


##
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java:
##
@@ -231,6 +231,31 @@ public class YarnConfigOptions {
 .withDescription(
 "A comma-separated list of tags to apply to the 
Flink YARN application.");
 
+/**
+ * Users and groups to give VIEW access.
+ * 
https://www.cloudera.com/documentation/enterprise/latest/topics/cm_mc_yarn_acl.html
+ */
+public static final ConfigOption APPLICATION_VIEW_ACLS =
+key("yarn.view.acls")
+.stringType()
+.noDefaultValue()
+.withDescription(
+"Users and groups to give VIEW access. The ACLs 
are of for"
++ " 
comma-separated-userscomma-separated-groups."
++ " Wildcard ACL is also supported. The 
only valid wildcard ACL "
++ " is *, which grants permission to all 
users and groups.");
+
+/** Users and groups to give MODIFY access. */
+public static final ConfigOption APPLICATION_MODIFY_ACLS =
+key("yarn.modify.acls")
+.stringType()
+.noDefaultValue()
+.withDescription(
+"Users and groups to give MODIFY access. The ACLs 
are of for"
++ " 
comma-separated-userscomma-separated-groups."
++ " Wildcard ACL is also supported. The 
only valid wildcard ACL "

Review Comment:
   ```suggestion
   + " Wildcard ACL is also supported. The 
only valid wildcard ACL"
   ```
   Redundant space.



##
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java:
##
@@ -231,6 +231,31 @@ public class YarnConfigOptions {
 .withDescription(
 "A comma-separated list of tags to apply to the 
Flink YARN application.");
 
+/**
+ * Users and groups to give VIEW access.
+ * 
https://www.cloudera.com/documentation/enterprise/latest/topics/cm_mc_yarn_acl.html
+ */
+public static final ConfigOption APPLICATION_VIEW_ACLS =
+key("yarn.view.acls")
+.stringType()
+.noDefaultValue()
+.withDescription(
+"Users and groups to give VIEW access. The ACLs 
are of for"
++ " 
comma-separated-userscomma-separated-groups."
++ " Wildcard ACL is also supported. The 
only valid wildcard ACL "

Review Comment:
   ```suggestion
   + " Wildcard ACL is also supported. The 
only valid wildcard ACL"
   ```
   Redundant space.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #22852: [FLINK-32420][connectors/common] Improve the watermark aggregation performance

2023-06-26 Thread via GitHub


1996fanrui commented on code in PR #22852:
URL: https://github.com/apache/flink/pull/22852#discussion_r1243183753


##
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##
@@ -642,7 +643,32 @@ private static class WatermarkAggregator {
  * Optional.empty()} otherwise.
  */
 public Optional aggregate(T key, Watermark watermark) {
-watermarks.put(key, watermark);
+Watermark oldWatermark = watermarks.put(key, watermark);
+// Step (1): Update the latest watermark of current key as the 
aggregatedWatermark
+// directly if it is less than the aggregatedWatermark.
+if (watermark.getTimestamp() < aggregatedWatermark.getTimestamp()) 
{
+aggregatedWatermark = watermark;
+return Optional.of(aggregatedWatermark);
+}
+
+// Step(2): The aggWM won't change when these conditions are met, 
so return directly:
+// case1. The latest WM of the current key isn't changed
+// case2. When oldWatermark isn't null and is greater than aggWm, 
it means that aggWm
+// comes from other keys. If new WM is greater than or equal to 
aggWm, then aggWm must
+// not change.
+// case3. When oldWatermark is null and {@link watermarks} has 
other keys, it means that
+// aggWm comes from other keys. If new WM is greater than or equal 
to aggWm, then aggWm
+// must not change.
+// Note: step(1) have returned when `watermark < 
aggregatedWatermark`, so all calls

Review Comment:
   Thanks for the review and good suggestion!
   
   Your suggestion is more concise, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #22845: [FLINK-32411][connector/common] Fix the bug about SourceCoordinator thread leaks

2023-06-26 Thread via GitHub


1996fanrui commented on code in PR #22845:
URL: https://github.com/apache/flink/pull/22845#discussion_r1243180040


##
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##
@@ -270,8 +269,9 @@ public void start() throws Exception {
 @Override
 public void close() throws Exception {
 LOG.info("Closing SourceCoordinator for source {}.", operatorName);
+closeQuietly(context);
 if (started) {
-closeAll(asList(context, enumerator), Throwable.class);
+closeQuietly(enumerator);

Review Comment:
   Thanks for the review, good catch! updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs commented on a diff in pull request #22832: [FLINK-32396][jdbc-driver] Support timestamp and timestamp_ltz for jdbc driver and sql gateway

2023-06-26 Thread via GitHub


FangYongs commented on code in PR #22832:
URL: https://github.com/apache/flink/pull/22832#discussion_r1243176991


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java:
##
@@ -57,19 +58,24 @@ public class ResultInfo {
 private final List columnInfos;
 private final List data;
 private final RowFormat rowFormat;
+private final ZoneId zoneId;
 
-ResultInfo(List columnInfos, List data, RowFormat 
rowFormat) {
+ResultInfo(
+List columnInfos, List data, RowFormat 
rowFormat, ZoneId zoneId) {
 this.columnInfos = columnInfos;
 this.data = data;
 this.rowFormat = rowFormat;
+this.zoneId = zoneId;
 }
 
 public static ResultInfo createResultInfo(ResultSet resultSet, RowFormat 
rowFormat) {
 Preconditions.checkArgument(resultSet.getResultType() != 
ResultSet.ResultType.NOT_READY);
 List data = resultSet.getData();
+ZoneId zoneId = ZoneId.systemDefault();
 
 switch (rowFormat) {
 case JSON:
+zoneId = ((ResultSetImpl) resultSet).getZoneId();

Review Comment:
   When `RowDataToStringConverterImpl` convert rowdata to string, it will 
convert  timestamp_ltz data to string according to the local zone id in 
`TimestampToStringCastRule`. In addition to passing the zone id to the jdbc 
driver, add a converter similar to `RowDataToStringConverter` for the JSON 
format and convert the data first in the gateway may be a better way. What do 
you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wangyang0918 commented on pull request #22509: [FLINK-31983] Add yarn Acls capability to Flink containers

2023-06-26 Thread via GitHub


wangyang0918 commented on PR #22509:
URL: https://github.com/apache/flink/pull/22509#issuecomment-1608825216

   > Based on the 
[documentation](https://docs.cloudera.com/documentation/enterprise/6/6.3/topics/cm_mc_yarn_acl.html#concept_activate_yarn_ACLs)
 here, it looks like yarn.acl.enable should be set to true to enable YARN ACLs. 
Just curious, whether it is set in your yarn-site.xml? Interesting, that it 
works after enabling Kerberos, is it because the other cluster or environment 
have this config set?
   
   Yes. I already configured the `yarn.acl.enabled = true` and `yarn.admin.acl 
= hadoop` in YARN cluster.
   
   Using knox to access the YARN UI makes the acl configurations taking effect. 
So it is not related with kerberos.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32260) Add SLICE support in SQL & Table API

2023-06-26 Thread Hanyu Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737443#comment-17737443
 ] 

Hanyu Zheng commented on FLINK-32260:
-

[~Sergey Nuyanzin] thank you for your suggestions.

Let us talk about this with [~bvarghese] and [~twalthr] 

> Add SLICE support in SQL & Table API
> 
>
> Key: FLINK-32260
> URL: https://issues.apache.org/jira/browse/FLINK-32260
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Bonnie Varghese
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Implement the {{array_slice}} function to extract a subset of elements from 
> an array.
> Description: The {{array_slice}} function in the ETL pipeline allows you to 
> extract a subset of elements from an array based on the specified starting 
> index and length. It supports both positive and negative indices, where 
> positive indices start from 1 (the first element) and negative indices start 
> from the end of the array (-1 being the last element).
> Syntax:
>  
> code
> {code:java}
> array_slice[x: array, start: int, length: int] -> array{code}
> {{ }}
> Arguments:
>  * {{{}x{}}}: The input array from which to extract the subset of elements.
>  * {{{}start{}}}: The starting index of the subset. If positive, it 
> represents the index from the beginning of the array. If negative, it 
> represents the index from the end of the array (-1 being the last element).
>  * {{{}length{}}}: The length of the subset to be extracted.
> Returns: An array containing the subset of elements extracted from the input 
> array {{{}x{}}}. The subset starts from the specified {{start}} index and has 
> the specified {{{}length{}}}.
> Examples:
>  # Extracting a subset from an array starting from index 2 with length 2:
>  
> {code:java}
> array_slice[array[1, 2, 3, 4], 2, 2]  Output: [2, 3]{code}
>      2. Extracting a subset from an array starting from the second-to-last 
> element with length 
>  
> {code:java}
> array_slice[array[1, 2, 3, 4], -2, 2]
> Output: [3, 4]{code}
> see also:
> spark:[https://spark.apache.org/docs/latest/api/sql/index.html#slice]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32263) Add ELT support in SQL & Table API

2023-06-26 Thread Hanyu Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737441#comment-17737441
 ] 

Hanyu Zheng commented on FLINK-32263:
-

[~jark] Thank you but how f0 stand for mixed type?

> Add ELT support in SQL & Table API
> --
>
> Key: FLINK-32263
> URL: https://issues.apache.org/jira/browse/FLINK-32263
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Bonnie Varghese
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Implement the elt function to extract the n-th input value from a list of 
> inputs.
> Description:
> The elt function in the ETL pipeline extracts the value at the n-th position 
> from a list of input values. It is similar to array indexing, where the first 
> element is at position 1. This function provides a convenient way to retrieve 
> specific elements from a list of inputs.
> Syntax:
>  
> {code:java}
> elt[n: int, *inputs: str] -> str or None{code}
>  
> Arguments:
> n: The index of the input value to extract. It should be a positive integer.
> *inputs: Variable-length arguments representing the list of inputs.
> Returns:
> The value at the n-th position in the list of inputs. If the index exceeds 
> the length of the array, the function returns NULL. 
> Examples:
> Retrieving the second element from a list of strings:
> {code:java}
> elt(2, 'scala', 'java')
> Output: 'java'{code}
> Retrieving the second element from a list of mixed types:
> {code:java}
> result = elt(2, 'a', 1)
> Output: 1{code}
> See also:
>  
> spark:[https://spark.apache.org/docs/latest/api/sql/index.html#elt]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] PatrickRen commented on pull request #22867: [BP-1.16][FLINK-32392][ci] Invalidate Maven repo cache on Azure every year to reduce its disk usage

2023-06-26 Thread via GitHub


PatrickRen commented on PR #22867:
URL: https://github.com/apache/flink/pull/22867#issuecomment-1608790353

   @snuyanzin I don't think adding -U could help, as the root cause is actually
   
   ```
   Could not transfer artifact org.opentest4j:opentest4j:jar:1.1.1 from/to 
google-maven-central 
(https://maven-central-eu.storage-download.googleapis.com/maven2/): Entry 
[id:28][route:{s}->https://maven-central-eu.storage-download.googleapis.com:443][state:null/]
 has not been leased from this pool
   ```
   which happened when downloading the dependency. 
   
   I found [FLINK-16947](https://issues.apache.org/jira/browse/FLINK-16947), 
which shows the same error messages. Probably it's another bug in the HTTP 
client that Maven Wagon depends on 😞 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32443) Translate "State Processor API" page into Chinese

2023-06-26 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-32443:
--

 Summary: Translate "State Processor API" page into Chinese
 Key: FLINK-32443
 URL: https://issues.apache.org/jira/browse/FLINK-32443
 Project: Flink
  Issue Type: Improvement
  Components: API / State Processor, chinese-translation, Documentation
Affects Versions: 1.18.0
Reporter: Yanfei Lei


The page URL is 
[https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/libs/state_processor_api/]

The markdown file is located in docs/content.zh/docs/libs/state_processor_api.md



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32443) Translate "State Processor API" page into Chinese

2023-06-26 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei reassigned FLINK-32443:
--

Assignee: Yanfei Lei

> Translate "State Processor API" page into Chinese
> -
>
> Key: FLINK-32443
> URL: https://issues.apache.org/jira/browse/FLINK-32443
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor, chinese-translation, Documentation
>Affects Versions: 1.18.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>
> The page URL is 
> [https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/libs/state_processor_api/]
> The markdown file is located in 
> docs/content.zh/docs/libs/state_processor_api.md



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] luoyuxia commented on a diff in pull request #22831: [FLINK-32388]Add the ability to pass parameters to CUSTOM PartitionCommitPolicy

2023-06-26 Thread via GitHub


luoyuxia commented on code in PR #22831:
URL: https://github.com/apache/flink/pull/22831#discussion_r1243092293


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java:
##
@@ -456,7 +456,8 @@ private DataStreamSink createBatchCompactSink(
 new PartitionCommitPolicyFactory(
 
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND),
 
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_CLASS),
-
conf.get(HiveOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME));
+
conf.get(HiveOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME),
+
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS));

Review Comment:
   ```suggestion
   
conf.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS));
   ```



##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java:
##
@@ -610,7 +611,8 @@ private DataStreamSink createBatchNoCompactSink(
 new PartitionCommitPolicyFactory(
 
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND),
 
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_CLASS),
-
conf.get(HiveOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME)));
+
conf.get(HiveOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME),
+
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS)));

Review Comment:
   dito



##
docs/content.zh/docs/connectors/table/filesystem.md:
##
@@ -451,6 +451,13 @@ public class HourPartTimeExtractor implements 
PartitionTimeExtractor {
 String
  实现 PartitionCommitPolicy 接口的分区提交策略类。只有在 custom 提交策略下才使用该类。
 
+
+sink.partition-commit.policy.class.parameters
+(无)
+String
+ 传入 custom 提交策略类的字符串参数, 要求多个参数之间用分号分隔, 比如 'param1;param2',

Review Comment:
   ```suggestion
传入 custom 提交策略类构造器的参数, 多个参数之间用分号分隔, 比如 'param1;param2',
   ```



##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionCommitPolicyFactory.java:
##
@@ -63,11 +67,23 @@ public List createPolicyChain(
 successFileName, fsSupplier.get());
 case PartitionCommitPolicy.CUSTOM:
 try {
-return (PartitionCommitPolicy)
-
cl.loadClass(customClass).newInstance();
-} catch (ClassNotFoundException
-| IllegalAccessException
-| InstantiationException e) {
+if (parameters != null && 
!parameters.isEmpty()) {
+String[] paramStrings = 
parameters.toArray(new String[parameters.size()]);

Review Comment:
   ```suggestion
   String[] paramStrings = 
parameters.toArray(new String[0]);
   ```



##
docs/content/docs/connectors/table/filesystem.md:
##
@@ -470,6 +470,14 @@ The partition commit policy defines what action is taken 
when partitions are com
 String
 The partition commit policy class for implement 
PartitionCommitPolicy interface. Only work in custom commit policy.
 
+
+sink.partition-commit.policy.class.parameters
+optional
+yes
+(none)
+String
+The custom commit policy class can accept a string argument, which 
can include multiple arguments separated by semicolons. For example, 
'param1;param2'. The string argument will be split into a list (['param1', 
'param2']) and passed as constructor parameters to the custom commit policy 
class.

Review Comment:
   ```suggestion
   The parameters passed to the constructor of the custom commit 
policy, with multiple parameters separated by semicolons, such as 
'param1;param2'.  For example, 'param1;param2'. The configuration value will be 
split into a list (['param1', 'param2']) and passed to the constructor of the 
custom commit policy class.
   ```



##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionCommitPolicyFactory.java:
##
@@ -63,11 +67,23 @@ public List createPolicyChain(
 successFileName, fsSupplier.get());
 case PartitionCommitPolicy.CUSTOM:
 try {
-return (PartitionCommitPolicy)
-
cl.loadClass(custo

[GitHub] [flink] hanyuzheng7 commented on pull request #22834: [FLINK-32260]-table-Add-ARRAY_SLICE-function

2023-06-26 Thread via GitHub


hanyuzheng7 commented on PR #22834:
URL: https://github.com/apache/flink/pull/22834#issuecomment-1608743900

   @snuyanzin for this two test cases
   ```
   SELECT array_slice(array[1, 2, 3], -5, 1);
   SELECT array_slice(array[1, 2, 3], 1, -1);
   ```
   I test it, it not fail with exception, the result of these two cases are 
null.
   https://github.com/apache/flink/assets/135176127/01e0632b-257c-479a-8892-38c1262799c2";>
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-28706) FLIP-248: Introduce dynamic partition pruning

2023-06-26 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin reassigned FLINK-28706:


Assignee: godfrey he

> FLIP-248: Introduce dynamic partition pruning
> -
>
> Key: FLINK-28706
> URL: https://issues.apache.org/jira/browse/FLINK-28706
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Hive, Runtime / Coordination, Table SQL / 
> Planner, Table SQL / Runtime
>Affects Versions: 1.16.0
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Major
> Fix For: 1.16.0
>
>
> Please refer to 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
>  for more details



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-19830) Properly implements processing-time temporal table join

2023-06-26 Thread Dong Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737424#comment-17737424
 ] 

Dong Lin commented on FLINK-19830:
--

Hi [~rmetzger] [~jark] , FYI, FLIP-326 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-326%3A+Enhance+Watermark+to+Support+Processing-Time+Temporal+Join]
 has been proposed to address this JIRA. It will be great if you have time to 
review this FLIP.

> Properly implements processing-time temporal table join
> ---
>
> Key: FLINK-19830
> URL: https://issues.apache.org/jira/browse/FLINK-19830
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Leonard Xu
>Assignee: Leonard Xu
>Priority: Major
>
> The exsiting TemporalProcessTimeJoinOperator has already supported temporal 
> table join.
>  However, the semantic of this implementation is problematic, because the 
> join processing for left stream doesn't wait for the complete snapshot of 
> temporal table, this may mislead users in production environment.
> Under the processing time temporal join semantics, to get the complete 
> snapshot of temporal table may need introduce new mechanism in FLINK SQL in 
> the future.
> **Background** : 
>  * The reason why we turn off the switch[1] for `FOR SYSTEM_TIME AS OF` 
> syntax for *temporal table join* is only the semantic consideration as above.
>  * The reason why we turn on *temporal table function*  is that it has been 
> alive for a long time, thus although it exists same semantic problem, but we 
> still support it from the perspective of compatibility.
> [1] 
> [https://github.com/apache/flink/blob/4fe9f525a92319acc1e3434bebed601306f7a16f/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java#L257]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32276) After adding the where condition to the flink lookup left join, the joinType becomes innerJoin

2023-06-26 Thread Xianxun Ye (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianxun Ye closed FLINK-32276.
--
Resolution: Not A Bug

[~jark]  Thanks for your response. Make sense to me.

> After adding the where condition to the flink lookup left join, the joinType 
> becomes innerJoin
> --
>
> Key: FLINK-32276
> URL: https://issues.apache.org/jira/browse/FLINK-32276
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.17.1
>Reporter: Xianxun Ye
>Priority: Major
> Attachments: lookup_join_inner_join_type.jpg
>
>
> *How to reproduce:*
> {code:java}
> CREATE TABLE dim (
>   id BIGINT,
>   name STRING,
>   age INT,
>   status BOOLEAN,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
>   'table-name' = 'users'
> );
> create table actions (
>   id bigint,
>   proc as proctime(),
>   primary key (id) not enforced
> ) with (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
>   'table-name' = 'actions'
> );
> select
>   *
> from
>   actions
>   left join dim for system_time as of actions.proc on actions.id = dim.id
> where
>   dim.age > 10; {code}
> When running the above SQL, the LookupJoin operator is executed based on 
> InnerJoin, contrary to the SQL's left join.
> If I remove the where condition(dim.age>10), the LookupJoin's joinType is 
> LeftOuterJoin.
> Is this a bug?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2023-06-26 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737413#comment-17737413
 ] 

loyi commented on FLINK-23190:
--

[~heigebupahei]   Looking forward to your feedback. :)

 

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] KarmaGYZ commented on a diff in pull request #22836: [FLINK-32191][netty] support set keepalive options to NettyClient.

2023-06-26 Thread via GitHub


KarmaGYZ commented on code in PR #22836:
URL: https://github.com/apache/flink/pull/22836#discussion_r1243040416


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientTest.java:
##
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
+import org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollChannelOption;
+
+import jdk.net.ExtendedSocketOptions;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyClient}. */
+public class NettyClientTest {
+@Test
+void testSetKeepaliveOptionWithNio() throws Exception {
+final Configuration config = new Configuration();
+config.set(NettyShuffleEnvironmentOptions.TRANSPORT_TYPE, "nio");
+
config.set(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_IDLE_SECONDS, 300);
+
config.set(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_INTERVAL_SECONDS, 10);
+config.set(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_COUNT, 8);
+
+try (NetUtils.Port clientPort = NetUtils.getAvailablePort()) {
+final NettyClient client = createNettyClient(config, clientPort);
+Map options =
+
client.getBootstrap().config().options().entrySet().stream()
+.collect(Collectors.toMap(e -> e.getKey().name(), 
Map.Entry::getValue));
+if (keepaliveForNioConfigurable()) {
+assertThat(options)
+.containsEntry(NettyClient.NIO_TCP_KEEPIDLE_KEY, 300)
+.containsEntry(NettyClient.NIO_TCP_KEEPINTERVAL_KEY, 
10)
+.containsEntry(NettyClient.NIO_TCP_KEEPCOUNT_KEY, 8);
+} else {
+assertThat(options)
+.doesNotContainKeys(
+NettyClient.NIO_TCP_KEEPIDLE_KEY,
+NettyClient.NIO_TCP_KEEPINTERVAL_KEY,
+NettyClient.NIO_TCP_KEEPCOUNT_KEY);
+}

Review Comment:
   We can split it into two separate tests. With 
`assumeThat(keepaliveForNioConfigurable()).isTrue();` and 
`assumeThat(keepaliveForNioConfigurable()).isFalse();` respectively.



##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientTest.java:
##
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
+import org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollChannelOption;
+
+import jdk.net.ExtendedSocketOptions;
+import org.junit.jupiter.api.Test;
+
+import java.net.

[GitHub] [flink] hanyuzheng7 commented on pull request #22785: [FLINK-32259]-table-Add-ARRAY_JOIN-function

2023-06-26 Thread via GitHub


hanyuzheng7 commented on PR #22785:
URL: https://github.com/apache/flink/pull/22785#issuecomment-1608538200

   @dawidwys I have already fixed all comments. You can do the second pass. 
Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22785: [FLINK-32259]-table-Add-ARRAY_JOIN-function

2023-06-26 Thread via GitHub


hanyuzheng7 commented on code in PR #22785:
URL: https://github.com/apache/flink/pull/22785#discussion_r1242997039


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayOfStringArgumentTypeStrategy.java:
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.Signature.Argument;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Strategy for an argument that must be an array of strings. */
+@Internal
+public final class ArrayOfStringArgumentTypeStrategy implements 
ArgumentTypeStrategy {
+
+private final String constraintMessage;
+
+public ArrayOfStringArgumentTypeStrategy(String constraintMessage) {
+this.constraintMessage = constraintMessage;
+}
+
+@Override
+public Optional inferArgumentType(
+CallContext callContext, int argumentPos, boolean throwOnFailure) {
+
+final List actualDataTypes = 
callContext.getArgumentDataTypes();
+DataType actualType = actualDataTypes.get(argumentPos);
+
+// Check if the type is an array and its element is string
+if (actualType.getLogicalType().getTypeRoot() == LogicalTypeRoot.ARRAY
+&& (((ArrayType) 
actualType.getLogicalType()).getElementType().getTypeRoot()
+== LogicalTypeRoot.VARCHAR
+|| ((ArrayType) 
actualType.getLogicalType()).getElementType().getTypeRoot()
+== LogicalTypeRoot.CHAR)) {
+return Optional.of(actualType);
+}

Review Comment:
   fixed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22785: [FLINK-32259]-table-Add-ARRAY_JOIN-function

2023-06-26 Thread via GitHub


hanyuzheng7 commented on code in PR #22785:
URL: https://github.com/apache/flink/pull/22785#discussion_r1242996736


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java:
##
@@ -1408,6 +1409,24 @@ public OutType arrayUnion(InType array) {
 unresolvedCall(ARRAY_UNION, toExpr(), 
objectToExpression(array)));
 }
 
+/**
+ * Returns a string that represents the concatenation of the elements in 
the given array and the
+ * elements' data type in the given array is string. The `delimiter` is a 
string that separates
+ * each pair of consecutive elements of the array. The optional 
`nullReplacement` is a string
+ * that replaces null elements in the array.
+ *
+ * Returns null if input array or delimiter or nullReplacement are null.
+ */
+public OutType arrayJoin(InType delimiter, InType... nullReplacement) {

Review Comment:
   fixed it.



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -285,6 +286,20 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayUnionFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_JOIN =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_JOIN")
+.kind(SCALAR)
+.inputTypeStrategy(
+varyingSequence(
+new 
ArrayOfStringArgumentTypeStrategy("ARRAY"),
+
logical(LogicalTypeFamily.CHARACTER_STRING),
+
logical(LogicalTypeFamily.CHARACTER_STRING)))

Review Comment:
   fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on pull request #22730: [FLINK-32257]-table-Add-ARRAY_MAX-function

2023-06-26 Thread via GitHub


hanyuzheng7 commented on PR #22730:
URL: https://github.com/apache/flink/pull/22730#issuecomment-1608534943

   @dawidwys I have already fixed all comments. You can do the second pass. 
Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] snuyanzin commented on a diff in pull request #654: Add JDBC v3.1.1 for Flink 1.16.x and Flink 1.17.x

2023-06-26 Thread via GitHub


snuyanzin commented on code in PR #654:
URL: https://github.com/apache/flink-web/pull/654#discussion_r1242939069


##
docs/data/release_archive.yml:
##
@@ -482,6 +482,11 @@ release_archive:
   version: 3.0.1
   release_date: 2023-05-09
   filename: "gcp-pubsub"
+- name: "Flink JDBC Connector"
+  connector: "jdbc"
+  version: 3.1.1
+  release_date: 2023-05-31

Review Comment:
   I assume the date  should be changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22558: [FLINK-28744][table] Upgrade Calcite to 1.31.0

2023-06-26 Thread via GitHub


snuyanzin commented on code in PR #22558:
URL: https://github.com/apache/flink/pull/22558#discussion_r1242932736


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java:
##
@@ -469,10 +469,10 @@ private static List allTypesBasic() {
 .fromCase(BIGINT(), DEFAULT_NEGATIVE_BIGINT, 539222987)
 .fromCase(FLOAT(), DEFAULT_POSITIVE_FLOAT, 123)
 .fromCase(FLOAT(), DEFAULT_NEGATIVE_FLOAT, -123)
-.fromCase(FLOAT(), 9234567891.12, 644633299)
+.fromCase(FLOAT(), 9234567891.12, 2147483647)

Review Comment:
   Added release notes to the JIRA issue



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28744) Upgrade Calcite version to 1.31

2023-06-26 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-28744:

Release Note: 
Since in Calcite 1.31.0 there was fixed issue 
https://issues.apache.org/jira/browse/CALCITE-4861 (Optimization of chained 
CAST calls can lead to unexpected behavior) it impacts Flink cast of numbers 
behavior. 
This could lead to different result in SQL casts for cases where overflow of 
one of the types is happened

> Upgrade Calcite version to 1.31
> ---
>
> Key: FLINK-28744
> URL: https://issues.apache.org/jira/browse/FLINK-28744
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / API
>Reporter: Martijn Visser
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> We should upgrade to Calcite 1.31 so we can benefit from 
> https://issues.apache.org/jira/browse/CALCITE-4865



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] snuyanzin commented on a diff in pull request #22558: [FLINK-28744][table] Upgrade Calcite to 1.31.0

2023-06-26 Thread via GitHub


snuyanzin commented on code in PR #22558:
URL: https://github.com/apache/flink/pull/22558#discussion_r1242916945


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java:
##
@@ -1132,22 +1132,14 @@ private static List numericBounds() {
 .fromCase(DOUBLE(), -1.7976931348623157E308d, 
Float.NEGATIVE_INFINITY)
 .build(),
 CastTestSpecBuilder.testCastTo(DECIMAL(38, 0))
-.fromCase(TINYINT(), Byte.MIN_VALUE - 1, new 
BigDecimal(Byte.MIN_VALUE - 1))
-.fromCase(TINYINT(), Byte.MAX_VALUE + 1, new 
BigDecimal(Byte.MAX_VALUE + 1))
-.fromCase(
-SMALLINT(),
-Short.MIN_VALUE - 1,
-new BigDecimal(Short.MIN_VALUE - 1))
-.fromCase(
-SMALLINT(),
-Short.MAX_VALUE + 1,
-new BigDecimal(Short.MAX_VALUE + 1))
-.fromCase(
-INT(), Integer.MIN_VALUE - 1, new 
BigDecimal(Integer.MIN_VALUE - 1))
-.fromCase(
-INT(), Integer.MAX_VALUE + 1, new 
BigDecimal(Integer.MAX_VALUE + 1))
-.fromCase(BIGINT(), Long.MIN_VALUE - 1, new 
BigDecimal(Long.MIN_VALUE - 1))
-.fromCase(BIGINT(), Long.MAX_VALUE + 1, new 
BigDecimal(Long.MAX_VALUE + 1))
+.fromCase(TINYINT(), Byte.MIN_VALUE - 1, new 
BigDecimal(Byte.MAX_VALUE))

Review Comment:
   May be I didn't get what you mean... I added cast to `byte` for input 
however here expected type is `BigDecimal` since here it is cast to 
`DECIMAL(38, 0)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22558: [FLINK-28744][table] Upgrade Calcite to 1.31.0

2023-06-26 Thread via GitHub


snuyanzin commented on code in PR #22558:
URL: https://github.com/apache/flink/pull/22558#discussion_r1242916945


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java:
##
@@ -1132,22 +1132,14 @@ private static List numericBounds() {
 .fromCase(DOUBLE(), -1.7976931348623157E308d, 
Float.NEGATIVE_INFINITY)
 .build(),
 CastTestSpecBuilder.testCastTo(DECIMAL(38, 0))
-.fromCase(TINYINT(), Byte.MIN_VALUE - 1, new 
BigDecimal(Byte.MIN_VALUE - 1))
-.fromCase(TINYINT(), Byte.MAX_VALUE + 1, new 
BigDecimal(Byte.MAX_VALUE + 1))
-.fromCase(
-SMALLINT(),
-Short.MIN_VALUE - 1,
-new BigDecimal(Short.MIN_VALUE - 1))
-.fromCase(
-SMALLINT(),
-Short.MAX_VALUE + 1,
-new BigDecimal(Short.MAX_VALUE + 1))
-.fromCase(
-INT(), Integer.MIN_VALUE - 1, new 
BigDecimal(Integer.MIN_VALUE - 1))
-.fromCase(
-INT(), Integer.MAX_VALUE + 1, new 
BigDecimal(Integer.MAX_VALUE + 1))
-.fromCase(BIGINT(), Long.MIN_VALUE - 1, new 
BigDecimal(Long.MIN_VALUE - 1))
-.fromCase(BIGINT(), Long.MAX_VALUE + 1, new 
BigDecimal(Long.MAX_VALUE + 1))
+.fromCase(TINYINT(), Byte.MIN_VALUE - 1, new 
BigDecimal(Byte.MAX_VALUE))

Review Comment:
   May be I didn't get what you mean... I added cast for input however here 
expected type is `BigDecimal` since here it is cast to `DECIMAL(38, 0)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30719) flink-runtime-web failed due to a corrupted

2023-06-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737379#comment-17737379
 ] 

Sergey Nuyanzin commented on FLINK-30719:
-

it looks like a maven-frontend-plugin issue mentioned at 
https://github.com/eirslett/frontend-maven-plugin/issues/882

> flink-runtime-web failed due to a corrupted 
> 
>
> Key: FLINK-30719
> URL: https://issues.apache.org/jira/browse/FLINK-30719
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend, Test Infrastructure, Tests
>Affects Versions: 1.16.0, 1.17.0, 1.18.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44954&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb&l=12550
> The build failed due to a corrupted nodejs dependency:
> {code}
> [ERROR] The archive file 
> /__w/1/.m2/repository/com/github/eirslett/node/16.13.2/node-16.13.2-linux-x64.tar.gz
>  is corrupted and will be deleted. Please try the build again.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30719) flink-runtime-web failed due to a corrupted

2023-06-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737376#comment-17737376
 ] 

Sergey Nuyanzin commented on FLINK-30719:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50291&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb&l=12046


> flink-runtime-web failed due to a corrupted 
> 
>
> Key: FLINK-30719
> URL: https://issues.apache.org/jira/browse/FLINK-30719
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend, Test Infrastructure, Tests
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44954&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb&l=12550
> The build failed due to a corrupted nodejs dependency:
> {code}
> [ERROR] The archive file 
> /__w/1/.m2/repository/com/github/eirslett/node/16.13.2/node-16.13.2-linux-x64.tar.gz
>  is corrupted and will be deleted. Please try the build again.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30719) flink-runtime-web failed due to a corrupted

2023-06-26 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-30719:

Affects Version/s: 1.18.0

> flink-runtime-web failed due to a corrupted 
> 
>
> Key: FLINK-30719
> URL: https://issues.apache.org/jira/browse/FLINK-30719
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend, Test Infrastructure, Tests
>Affects Versions: 1.16.0, 1.17.0, 1.18.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44954&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb&l=12550
> The build failed due to a corrupted nodejs dependency:
> {code}
> [ERROR] The archive file 
> /__w/1/.m2/repository/com/github/eirslett/node/16.13.2/node-16.13.2-linux-x64.tar.gz
>  is corrupted and will be deleted. Please try the build again.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32442) DownloadPipelineArtifact fails on AZP

2023-06-26 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-32442:

Description: 
DownloadPipelineArtifact fails on AZP
{noformat}
Starting: DownloadPipelineArtifact
==
Task : Download Pipeline Artifacts
Description  : Download build and pipeline artifacts
Version  : 2.198.0
Author   : Microsoft Corporation
Help : 
https://docs.microsoft.com/azure/devops/pipelines/tasks/utility/download-pipeline-artifact
==
Download from the specified build: #50309
Download artifact to: /home/agent02/_work/2/flink_artifact
##[error]Cannot assign requested address
Finishing: DownloadPipelineArtifact

{noformat}

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50309&view=logs&j=aa18c3f6-13b8-5f58-86bb-c1cffb239496&t=34dbf679-0f1d-54d2-de92-a83b268b346a&l=11

  was:
DownloadPipelineArtifact fails on AZP

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50309&view=logs&j=aa18c3f6-13b8-5f58-86bb-c1cffb239496&t=34dbf679-0f1d-54d2-de92-a83b268b346a&l=11


> DownloadPipelineArtifact fails on AZP
> -
>
> Key: FLINK-32442
> URL: https://issues.apache.org/jira/browse/FLINK-32442
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Priority: Major
>  Labels: test-stability
>
> DownloadPipelineArtifact fails on AZP
> {noformat}
> Starting: DownloadPipelineArtifact
> ==
> Task : Download Pipeline Artifacts
> Description  : Download build and pipeline artifacts
> Version  : 2.198.0
> Author   : Microsoft Corporation
> Help : 
> https://docs.microsoft.com/azure/devops/pipelines/tasks/utility/download-pipeline-artifact
> ==
> Download from the specified build: #50309
> Download artifact to: /home/agent02/_work/2/flink_artifact
> ##[error]Cannot assign requested address
> Finishing: DownloadPipelineArtifact
> {noformat}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50309&view=logs&j=aa18c3f6-13b8-5f58-86bb-c1cffb239496&t=34dbf679-0f1d-54d2-de92-a83b268b346a&l=11



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32442) DownloadPipelineArtifact fails on AZP

2023-06-26 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-32442:
---

 Summary: DownloadPipelineArtifact fails on AZP
 Key: FLINK-32442
 URL: https://issues.apache.org/jira/browse/FLINK-32442
 Project: Flink
  Issue Type: Bug
  Components: Test Infrastructure
Affects Versions: 1.18.0
Reporter: Sergey Nuyanzin


DownloadPipelineArtifact fails on AZP

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50309&view=logs&j=aa18c3f6-13b8-5f58-86bb-c1cffb239496&t=34dbf679-0f1d-54d2-de92-a83b268b346a&l=11



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-18356) flink-table-planner Exit code 137 returned from process

2023-06-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737371#comment-17737371
 ] 

Sergey Nuyanzin commented on FLINK-18356:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50282&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4&l=12267

> flink-table-planner Exit code 137 returned from process
> ---
>
> Key: FLINK-18356
> URL: https://issues.apache.org/jira/browse/FLINK-18356
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines, Tests
>Affects Versions: 1.12.0, 1.13.0, 1.14.0, 1.15.0, 1.16.0, 1.17.0, 1.18.0
>Reporter: Piotr Nowojski
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Attachments: 1234.jpg, app-profiling_4.gif, 
> image-2023-01-11-22-21-57-784.png, image-2023-01-11-22-22-32-124.png, 
> image-2023-02-16-20-18-09-431.png
>
>
> {noformat}
> = test session starts 
> ==
> platform linux -- Python 3.7.3, pytest-5.4.3, py-1.8.2, pluggy-0.13.1
> cachedir: .tox/py37-cython/.pytest_cache
> rootdir: /__w/3/s/flink-python
> collected 568 items
> pyflink/common/tests/test_configuration.py ..[  
> 1%]
> pyflink/common/tests/test_execution_config.py ...[  
> 5%]
> pyflink/dataset/tests/test_execution_environment.py .
> ##[error]Exit code 137 returned from process: file name '/bin/docker', 
> arguments 'exec -i -u 1002 
> 97fc4e22522d2ced1f4d23096b8929045d083dd0a99a4233a8b20d0489e9bddb 
> /__a/externals/node/bin/node /__w/_temp/containerHandlerInvoker.js'.
> Finishing: Test - python
> {noformat}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=3729&view=logs&j=9cada3cb-c1d3-5621-16da-0f718fb86602&t=8d78fe4f-d658-5c70-12f8-4921589024c3



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32370) JDBC SQl gateway e2e test is unstable

2023-06-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737369#comment-17737369
 ] 

Sergey Nuyanzin commented on FLINK-32370:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50353&view=logs&j=fb37c667-81b7-5c22-dd91-846535e99a97&t=011e961e-597c-5c96-04fe-7941c8b83f23&l=3674

> JDBC SQl gateway e2e test is unstable
> -
>
> Key: FLINK-32370
> URL: https://issues.apache.org/jira/browse/FLINK-32370
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Assignee: Fang Yong
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0
>
> Attachments: flink-vsts-sql-gateway-0-fv-az75-650.log, 
> flink-vsts-standalonesession-0-fv-az75-650.log, 
> flink-vsts-taskexecutor-0-fv-az75-650.log
>
>
> The client is failing while trying to collect data when the job already 
> finished on the cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32370) JDBC SQl gateway e2e test is unstable

2023-06-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737368#comment-17737368
 ] 

Sergey Nuyanzin commented on FLINK-32370:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50353&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&l=3450

> JDBC SQl gateway e2e test is unstable
> -
>
> Key: FLINK-32370
> URL: https://issues.apache.org/jira/browse/FLINK-32370
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Assignee: Fang Yong
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0
>
> Attachments: flink-vsts-sql-gateway-0-fv-az75-650.log, 
> flink-vsts-standalonesession-0-fv-az75-650.log, 
> flink-vsts-taskexecutor-0-fv-az75-650.log
>
>
> The client is failing while trying to collect data when the job already 
> finished on the cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32379) Skip archunit tests in java1X-target profiles

2023-06-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737367#comment-17737367
 ] 

Sergey Nuyanzin commented on FLINK-32379:
-

[~chesnay] are you ok with {{@ArchTag}} approach?
I could submit a PR for that.
I'm asking since the task is assigned to you

> Skip archunit tests in java1X-target profiles
> -
>
> Key: FLINK-32379
> URL: https://issues.apache.org/jira/browse/FLINK-32379
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.18.0
>
>
> When compiling to Java 11/17 byte code archunit fails; not sure why. Maybe it 
> finds more/less stuff or signatures are represented differently.
> In any case let's use the Java 8 bytecode version as the "canonical" version 
> and skip archunit otherwise.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32379) Skip archunit tests in java1X-target profiles

2023-06-26 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-32379:

Labels: test-stability  (was: )

> Skip archunit tests in java1X-target profiles
> -
>
> Key: FLINK-32379
> URL: https://issues.apache.org/jira/browse/FLINK-32379
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.18.0
>
>
> When compiling to Java 11/17 byte code archunit fails; not sure why. Maybe it 
> finds more/less stuff or signatures are represented differently.
> In any case let's use the Java 8 bytecode version as the "canonical" version 
> and skip archunit otherwise.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32379) Skip archunit tests in java1X-target profiles

2023-06-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737366#comment-17737366
 ] 

Sergey Nuyanzin commented on FLINK-32379:
-

since it's failing ci builds on mirror I upgrade it to critical
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50360&view=logs&j=298e20ef-7951-5965-0e79-ea664ddc435e&t=d4c90338-c843-57b0-3232-10ae74f00347&l=23263

> Skip archunit tests in java1X-target profiles
> -
>
> Key: FLINK-32379
> URL: https://issues.apache.org/jira/browse/FLINK-32379
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.18.0
>
>
> When compiling to Java 11/17 byte code archunit fails; not sure why. Maybe it 
> finds more/less stuff or signatures are represented differently.
> In any case let's use the Java 8 bytecode version as the "canonical" version 
> and skip archunit otherwise.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32379) Skip archunit tests in java1X-target profiles

2023-06-26 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-32379:

Priority: Critical  (was: Major)

> Skip archunit tests in java1X-target profiles
> -
>
> Key: FLINK-32379
> URL: https://issues.apache.org/jira/browse/FLINK-32379
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.18.0
>
>
> When compiling to Java 11/17 byte code archunit fails; not sure why. Maybe it 
> finds more/less stuff or signatures are represented differently.
> In any case let's use the Java 8 bytecode version as the "canonical" version 
> and skip archunit otherwise.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32370) JDBC SQl gateway e2e test is unstable

2023-06-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737365#comment-17737365
 ] 

Sergey Nuyanzin commented on FLINK-32370:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50360&view=logs&j=fb37c667-81b7-5c22-dd91-846535e99a97&t=011e961e-597c-5c96-04fe-7941c8b83f23&l=3701

> JDBC SQl gateway e2e test is unstable
> -
>
> Key: FLINK-32370
> URL: https://issues.apache.org/jira/browse/FLINK-32370
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Assignee: Fang Yong
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0
>
> Attachments: flink-vsts-sql-gateway-0-fv-az75-650.log, 
> flink-vsts-standalonesession-0-fv-az75-650.log, 
> flink-vsts-taskexecutor-0-fv-az75-650.log
>
>
> The client is failing while trying to collect data when the job already 
> finished on the cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32392) Several jobs failed on AZP with No space left on device

2023-06-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737364#comment-17737364
 ] 

Sergey Nuyanzin commented on FLINK-32392:
-

1.17.x: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50397&view=logs&j=585d8b77-fa33-51bc-8163-03e54ba9ce5b&t=68e20e55-906c-5c49-157c-3005667723c9

> Several jobs failed on AZP with No space left on device
> ---
>
> Key: FLINK-32392
> URL: https://issues.apache.org/jira/browse/FLINK-32392
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure
>Affects Versions: 1.18.0, 1.16.3, 1.17.2
>Reporter: Sergey Nuyanzin
>Assignee: Qingsheng Ren
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> This Build failed with no space left 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50162&view=logs&j=585d8b77-fa33-51bc-8163-03e54ba9ce5b
> {noformat}
> ##[error]Unhandled exception. System.IO.IOException: No space left on device 
> : '/home/vsts/agents/3.220.5/_diag/Worker_20230619-021757-utc.log'
>at System.IO.RandomAccess.WriteAtOffset(SafeFileHandle handle, 
> ReadOnlySpan`1 buffer, Int64 fileOffset)
>at System.IO.Strategies.BufferedFileStreamStrategy.FlushWrite()
>at System.IO.StreamWriter.Flush(Boolean flushStream, Boolean flushEncoder)
>at System.Diagnostics.TextWriterTraceListener.Flush()
>at 
> Microsoft.VisualStudio.Services.Agent.HostTraceListener.WriteHeader(String 
> source, TraceEventType eventType, Int32 id) in 
> /home/vsts/work/1/s/src/Microsoft.VisualStudio.Services.Agent/HostTraceListener.cs:line
>  151
>at 
> Microsoft.VisualStudio.Services.Agent.HostTraceListener.TraceEvent(TraceEventCache
>  eventCache, String source, TraceEventType eventType, Int32 id, String 
> message) in 
> /home/vsts/work/1/s/src/Microsoft.VisualStudio.Services.Agent/HostTraceListener.cs:line
>  81
>at System.Diagnostics.TraceSource.TraceEvent(TraceEventType eventType, 
> Int32 id, String message)
>at 
> Microsoft.VisualStudio.Services.Agent.Util.ProcessInvoker.ProcessExitedHandler(Object
>  sender, EventArgs e) in 
> /home/vsts/work/1/s/src/Agent.Sdk/ProcessInvoker.cs:line 496
>at System.Diagnostics.Process.OnExited()
>at System.Diagnostics.Process.RaiseOnExited()
>at System.Diagnostics.Process.CompletionCallback(Object waitHandleContext, 
> Boolean wasSignaled)
>at 
> System.Threading._ThreadPoolWaitOrTimerCallback.WaitOrTimerCallback_Context_f(Object
>  state)
>at System.Threading.ExecutionContext.RunInternal(ExecutionContext 
> executionContext, ContextCallback callback, Object state)
> --- End of stack trace from previous location ---
>at 
> System.Threading._ThreadPoolWaitOrTimerCallback.PerformWaitOrTimerCallback(_ThreadPoolWaitOrTimerCallback
>  helper, Boolean timedOut)
>at System.Threading.PortableThreadPool.CompleteWait(RegisteredWaitHandle 
> handle, Boolean timedOut)
>at System.Threading.ThreadPoolWorkQueue.Dispatch()
>at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()
> ,##[error]The hosted runner encountered an error while running your job. 
> (Error Type: Failure).
> {noformat}
> for 1.16, 1.17 it happens while   'Upload artifacts to S3'
> for 1.18 while 'Deploy maven snapshot'



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32441) DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore fails with timeout on AZP

2023-06-26 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-32441:
---

 Summary: 
DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore fails with 
timeout on AZP
 Key: FLINK-32441
 URL: https://issues.apache.org/jira/browse/FLINK-32441
 Project: Flink
  Issue Type: Bug
  Components: API / Core, Tests
Affects Versions: 1.18.0
Reporter: Sergey Nuyanzin


This build 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50461&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=9274

fails with timeout on 
{{DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore}}





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


hanyuzheng7 commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242874089


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.KeyValueDataType;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */
+@Internal
+public class MapUnionFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter keyElementGetter;
+private final ArrayData.ElementGetter valueElementGetter;
+
+public MapUnionFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.MAP_UNION, context);
+KeyValueDataType outputType =
+((KeyValueDataType) 
context.getCallContext().getOutputDataType().get());
+keyElementGetter =
+
ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType());
+valueElementGetter =
+
ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType());
+}
+
+public @Nullable MapData eval(@Nullable MapData map1, @Nullable MapData 
map2) {
+if (map1 == null || map2 == null) {
+return null;
+}
+MapDataForMapUnion resultMap = new MapDataForMapUnion(map1, map2);
+return resultMap;
+}
+
+private class MapDataForMapUnion implements MapData {
+private ArrayData keysArray;
+private ArrayData valuesArray;
+private final Map map;
+
+public MapDataForMapUnion(MapData map1, MapData map2) {
+
+this.map = new HashMap<>();

Review Comment:
   I check the isEqual method in ARRAY_DISTINCT, but I cannot find it's source 
code. So I don't know the background logical of IsEqual.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


hanyuzheng7 commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242867857


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.KeyValueDataType;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */
+@Internal
+public class MapUnionFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter keyElementGetter;
+private final ArrayData.ElementGetter valueElementGetter;
+
+public MapUnionFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.MAP_UNION, context);
+KeyValueDataType outputType =
+((KeyValueDataType) 
context.getCallContext().getOutputDataType().get());
+keyElementGetter =
+
ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType());
+valueElementGetter =
+
ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType());
+}
+
+public @Nullable MapData eval(@Nullable MapData map1, @Nullable MapData 
map2) {
+if (map1 == null || map2 == null) {
+return null;
+}
+MapDataForMapUnion resultMap = new MapDataForMapUnion(map1, map2);
+return resultMap;
+}
+
+private class MapDataForMapUnion implements MapData {
+private ArrayData keysArray;
+private ArrayData valuesArray;
+private final Map map;
+
+public MapDataForMapUnion(MapData map1, MapData map2) {
+
+this.map = new HashMap<>();

Review Comment:
   To be honest, I don't know under the hood uses java's equals which is not 
100% same as SQL. 
   Why binaryData's equals is not sync with SQL's sync?
   But If we transfer the input Data to the Expression, the data we received in 
MapUnionFunction are binaryMap.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


hanyuzheng7 commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242867857


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.KeyValueDataType;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */
+@Internal
+public class MapUnionFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter keyElementGetter;
+private final ArrayData.ElementGetter valueElementGetter;
+
+public MapUnionFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.MAP_UNION, context);
+KeyValueDataType outputType =
+((KeyValueDataType) 
context.getCallContext().getOutputDataType().get());
+keyElementGetter =
+
ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType());
+valueElementGetter =
+
ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType());
+}
+
+public @Nullable MapData eval(@Nullable MapData map1, @Nullable MapData 
map2) {
+if (map1 == null || map2 == null) {
+return null;
+}
+MapDataForMapUnion resultMap = new MapDataForMapUnion(map1, map2);
+return resultMap;
+}
+
+private class MapDataForMapUnion implements MapData {
+private ArrayData keysArray;
+private ArrayData valuesArray;
+private final Map map;
+
+public MapDataForMapUnion(MapData map1, MapData map2) {
+
+this.map = new HashMap<>();

Review Comment:
   To be honest, I don't know API and SQL there are different. Why binaryData's 
equals is not sync with SQL's sync.
   But If we transfer the input Data to the Expression, the data we received in 
MapUnionFunction are binaryMap.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


snuyanzin commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242863699


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.KeyValueDataType;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */
+@Internal
+public class MapUnionFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter keyElementGetter;
+private final ArrayData.ElementGetter valueElementGetter;
+
+public MapUnionFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.MAP_UNION, context);
+KeyValueDataType outputType =
+((KeyValueDataType) 
context.getCallContext().getOutputDataType().get());
+keyElementGetter =
+
ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType());
+valueElementGetter =
+
ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType());
+}
+
+public @Nullable MapData eval(@Nullable MapData map1, @Nullable MapData 
map2) {
+if (map1 == null || map2 == null) {
+return null;
+}
+MapDataForMapUnion resultMap = new MapDataForMapUnion(map1, map2);
+return resultMap;
+}
+
+private class MapDataForMapUnion implements MapData {
+private ArrayData keysArray;
+private ArrayData valuesArray;
+private final Map map;
+
+public MapDataForMapUnion(MapData map1, MapData map2) {
+
+this.map = new HashMap<>();

Review Comment:
   Can you please elaborate where have you seen guarantees that binaryData's 
equals is in sync with SQL's sync?
   Also there is no limitations to use non binarydata base on current method 
signature, so we can not rely on it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


hanyuzheng7 commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242856919


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.KeyValueDataType;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */
+@Internal
+public class MapUnionFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter keyElementGetter;
+private final ArrayData.ElementGetter valueElementGetter;
+
+public MapUnionFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.MAP_UNION, context);
+KeyValueDataType outputType =
+((KeyValueDataType) 
context.getCallContext().getOutputDataType().get());
+keyElementGetter =
+
ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType());
+valueElementGetter =
+
ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType());
+}
+
+public @Nullable MapData eval(@Nullable MapData map1, @Nullable MapData 
map2) {
+if (map1 == null || map2 == null) {
+return null;
+}
+MapDataForMapUnion resultMap = new MapDataForMapUnion(map1, map2);
+return resultMap;
+}
+
+private class MapDataForMapUnion implements MapData {
+private ArrayData keysArray;
+private ArrayData valuesArray;
+private final Map map;
+
+public MapDataForMapUnion(MapData map1, MapData map2) {
+
+this.map = new HashMap<>();
+for (int i = 0; i < map1.size(); i++) {
+final Object key = 
keyElementGetter.getElementOrNull(map1.keyArray(), i);
+final Object value = 
valueElementGetter.getElementOrNull(map1.valueArray(), i);
+this.map.put(key, value);
+}
+for (int i = 0; i < map2.size(); i++) {
+final Object key = 
keyElementGetter.getElementOrNull(map2.keyArray(), i);
+final Object value = 
valueElementGetter.getElementOrNull(map2.valueArray(), i);
+this.map.put(key, value);
+}
+this.updateArrays();
+}
+
+@Override
+public int size() {
+return map.size();
+}
+
+@Override
+public ArrayData keyArray() {
+return keysArray;
+}
+
+@Override
+public ArrayData valueArray() {
+return valuesArray;
+}
+
+private void updateArrays() {
+this.keysArray = new GenericArrayData(map.keySet().toArray());

Review Comment:
   If we use equalityEvaluator, we will update map many times. So I think we 
should keep it now.



##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDI

[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


hanyuzheng7 commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242850416


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.KeyValueDataType;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */
+@Internal
+public class MapUnionFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter keyElementGetter;
+private final ArrayData.ElementGetter valueElementGetter;
+
+public MapUnionFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.MAP_UNION, context);
+KeyValueDataType outputType =
+((KeyValueDataType) 
context.getCallContext().getOutputDataType().get());
+keyElementGetter =
+
ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType());
+valueElementGetter =
+
ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType());
+}
+
+public @Nullable MapData eval(@Nullable MapData map1, @Nullable MapData 
map2) {
+if (map1 == null || map2 == null) {
+return null;
+}
+MapDataForMapUnion resultMap = new MapDataForMapUnion(map1, map2);
+return resultMap;
+}
+
+private class MapDataForMapUnion implements MapData {
+private ArrayData keysArray;
+private ArrayData valuesArray;
+private final Map map;
+
+public MapDataForMapUnion(MapData map1, MapData map2) {
+
+this.map = new HashMap<>();

Review Comment:
   And I check the source code. if the data can pass to this function, they are 
all binaryData, so we can use java's equal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


snuyanzin commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242849867


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.KeyValueDataType;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */
+@Internal
+public class MapUnionFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter keyElementGetter;
+private final ArrayData.ElementGetter valueElementGetter;
+
+public MapUnionFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.MAP_UNION, context);
+KeyValueDataType outputType =
+((KeyValueDataType) 
context.getCallContext().getOutputDataType().get());
+keyElementGetter =
+
ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType());
+valueElementGetter =
+
ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType());
+}
+
+public @Nullable MapData eval(@Nullable MapData map1, @Nullable MapData 
map2) {
+if (map1 == null || map2 == null) {
+return null;
+}
+MapDataForMapUnion resultMap = new MapDataForMapUnion(map1, map2);

Review Comment:
   if both are empty just return any of them.
   Returning a new empty map creates new object, so this is not the best option



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


hanyuzheng7 commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242838442


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java:
##
@@ -1423,6 +1424,16 @@ public OutType mapEntries() {
 return toApiSpecificExpression(unresolvedCall(MAP_ENTRIES, toExpr()));
 }
 
+/**
+ * Returns a map created by merging two maps, 'map1' and 'map2'. This two 
maps should have same

Review Comment:
   I will fix it. Thank you.



##
flink-python/pyflink/table/expression.py:
##
@@ -1528,6 +1528,18 @@ def map_keys(self) -> 'Expression':
 """
 return _unary_op("mapKeys")(self)
 
+@property
+def map_union(self, map2) -> 'Expression':
+"""
+Returns a map created by merging two maps, 'map1' and 'map2'.
+This two maps should have same data structure. If there are 
overlapping keys,

Review Comment:
   I will fix it. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


hanyuzheng7 commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242839050


##
docs/data/sql_functions.yml:
##
@@ -649,6 +649,9 @@ collection:
   - sql: MAP_KEYS(map)
 table: MAP.mapKeys()
 description: Returns the keys of the map as array. No order guaranteed.
+SQL: MAP_UNION(map1, map2)
+Table: map1.mapUnion(map2)
+Description: Returns a map created by merging two maps, 'map1' and 'map2'. 
This two maps should have same data structure. If there are overlapping keys, 
the value from 'map2' will overwrite the value from 'map1'. If any of maps are 
null, return null.

Review Comment:
   I will fix it. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


hanyuzheng7 commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242836983


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.KeyValueDataType;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */
+@Internal
+public class MapUnionFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter keyElementGetter;
+private final ArrayData.ElementGetter valueElementGetter;
+
+public MapUnionFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.MAP_UNION, context);
+KeyValueDataType outputType =
+((KeyValueDataType) 
context.getCallContext().getOutputDataType().get());
+keyElementGetter =
+
ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType());
+valueElementGetter =
+
ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType());
+}
+
+public @Nullable MapData eval(@Nullable MapData map1, @Nullable MapData 
map2) {
+if (map1 == null || map2 == null) {
+return null;
+}
+MapDataForMapUnion resultMap = new MapDataForMapUnion(map1, map2);

Review Comment:
   I can add a if statement, if one map is empty, I can directly return the 
non-empty one. 
   if two maps are empty, I will return the first map.  or if two maps are 
empty, I return a new empty map.
   which one is better? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


snuyanzin commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242829756


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.KeyValueDataType;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */
+@Internal
+public class MapUnionFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter keyElementGetter;
+private final ArrayData.ElementGetter valueElementGetter;
+
+public MapUnionFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.MAP_UNION, context);
+KeyValueDataType outputType =
+((KeyValueDataType) 
context.getCallContext().getOutputDataType().get());
+keyElementGetter =
+
ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType());
+valueElementGetter =
+
ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType());
+}
+
+public @Nullable MapData eval(@Nullable MapData map1, @Nullable MapData 
map2) {
+if (map1 == null || map2 == null) {
+return null;
+}
+MapDataForMapUnion resultMap = new MapDataForMapUnion(map1, map2);
+return resultMap;
+}
+
+private class MapDataForMapUnion implements MapData {
+private ArrayData keysArray;
+private ArrayData valuesArray;
+private final Map map;
+
+public MapDataForMapUnion(MapData map1, MapData map2) {
+
+this.map = new HashMap<>();

Review Comment:
   IMHO first we should care about correctness and only then about speed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


snuyanzin commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242829756


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.KeyValueDataType;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */
+@Internal
+public class MapUnionFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter keyElementGetter;
+private final ArrayData.ElementGetter valueElementGetter;
+
+public MapUnionFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.MAP_UNION, context);
+KeyValueDataType outputType =
+((KeyValueDataType) 
context.getCallContext().getOutputDataType().get());
+keyElementGetter =
+
ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType());
+valueElementGetter =
+
ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType());
+}
+
+public @Nullable MapData eval(@Nullable MapData map1, @Nullable MapData 
map2) {
+if (map1 == null || map2 == null) {
+return null;
+}
+MapDataForMapUnion resultMap = new MapDataForMapUnion(map1, map2);
+return resultMap;
+}
+
+private class MapDataForMapUnion implements MapData {
+private ArrayData keysArray;
+private ArrayData valuesArray;
+private final Map map;
+
+public MapDataForMapUnion(MapData map1, MapData map2) {
+
+this.map = new HashMap<>();

Review Comment:
   first we should care about correctness and only then about speed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32370) JDBC SQl gateway e2e test is unstable

2023-06-26 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-32370:

Priority: Critical  (was: Major)

> JDBC SQl gateway e2e test is unstable
> -
>
> Key: FLINK-32370
> URL: https://issues.apache.org/jira/browse/FLINK-32370
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Assignee: Fang Yong
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0
>
> Attachments: flink-vsts-sql-gateway-0-fv-az75-650.log, 
> flink-vsts-standalonesession-0-fv-az75-650.log, 
> flink-vsts-taskexecutor-0-fv-az75-650.log
>
>
> The client is failing while trying to collect data when the job already 
> finished on the cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32370) JDBC SQl gateway e2e test is unstable

2023-06-26 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-32370:

Labels: pull-request-available test-stability  (was: pull-request-available)

> JDBC SQl gateway e2e test is unstable
> -
>
> Key: FLINK-32370
> URL: https://issues.apache.org/jira/browse/FLINK-32370
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Assignee: Fang Yong
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0
>
> Attachments: flink-vsts-sql-gateway-0-fv-az75-650.log, 
> flink-vsts-standalonesession-0-fv-az75-650.log, 
> flink-vsts-taskexecutor-0-fv-az75-650.log
>
>
> The client is failing while trying to collect data when the job already 
> finished on the cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


hanyuzheng7 commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242823684


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.KeyValueDataType;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */
+@Internal
+public class MapUnionFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter keyElementGetter;
+private final ArrayData.ElementGetter valueElementGetter;
+
+public MapUnionFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.MAP_UNION, context);
+KeyValueDataType outputType =
+((KeyValueDataType) 
context.getCallContext().getOutputDataType().get());
+keyElementGetter =
+
ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType());
+valueElementGetter =
+
ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType());
+}
+
+public @Nullable MapData eval(@Nullable MapData map1, @Nullable MapData 
map2) {
+if (map1 == null || map2 == null) {
+return null;
+}
+MapDataForMapUnion resultMap = new MapDataForMapUnion(map1, map2);
+return resultMap;
+}
+
+private class MapDataForMapUnion implements MapData {
+private ArrayData keysArray;
+private ArrayData valuesArray;
+private final Map map;
+
+public MapDataForMapUnion(MapData map1, MapData map2) {
+
+this.map = new HashMap<>();

Review Comment:
   But using equalityEvaluator, the time complexity will become to n square, 
because we cannot use hashmap. Is that ok?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


hanyuzheng7 commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242823684


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.KeyValueDataType;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */
+@Internal
+public class MapUnionFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter keyElementGetter;
+private final ArrayData.ElementGetter valueElementGetter;
+
+public MapUnionFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.MAP_UNION, context);
+KeyValueDataType outputType =
+((KeyValueDataType) 
context.getCallContext().getOutputDataType().get());
+keyElementGetter =
+
ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType());
+valueElementGetter =
+
ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType());
+}
+
+public @Nullable MapData eval(@Nullable MapData map1, @Nullable MapData 
map2) {
+if (map1 == null || map2 == null) {
+return null;
+}
+MapDataForMapUnion resultMap = new MapDataForMapUnion(map1, map2);
+return resultMap;
+}
+
+private class MapDataForMapUnion implements MapData {
+private ArrayData keysArray;
+private ArrayData valuesArray;
+private final Map map;
+
+public MapDataForMapUnion(MapData map1, MapData map2) {
+
+this.map = new HashMap<>();

Review Comment:
   but if use equalityEvaluator, the time complexity will become to n square. 
because we cannot use hashmap. Is that ok?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Reopened] (FLINK-32370) JDBC SQl gateway e2e test is unstable

2023-06-26 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin reopened FLINK-32370:
-

It seems the issue is still present

mirror build is failing with it
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50436&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&l=3520

it contains both commits attached this JIRA issue.

[~zjureel] could you please have a look

> JDBC SQl gateway e2e test is unstable
> -
>
> Key: FLINK-32370
> URL: https://issues.apache.org/jira/browse/FLINK-32370
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Assignee: Fang Yong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
> Attachments: flink-vsts-sql-gateway-0-fv-az75-650.log, 
> flink-vsts-standalonesession-0-fv-az75-650.log, 
> flink-vsts-taskexecutor-0-fv-az75-650.log
>
>
> The client is failing while trying to collect data when the job already 
> finished on the cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


hanyuzheng7 commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242809920


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CommonMapInputTypeStrategy.java:
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.utils.LogicalTypeMerging;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** An {@link InputTypeStrategy} that expects that all arguments have a common 
map type. */
+@Internal
+public final class CommonMapInputTypeStrategy implements InputTypeStrategy {

Review Comment:
   Because Two map should have same data structure. their key type should same, 
their value type also should same. we cannot union two maps which data 
structure are different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


snuyanzin commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242783033


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.KeyValueDataType;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */
+@Internal
+public class MapUnionFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter keyElementGetter;
+private final ArrayData.ElementGetter valueElementGetter;
+
+public MapUnionFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.MAP_UNION, context);
+KeyValueDataType outputType =
+((KeyValueDataType) 
context.getCallContext().getOutputDataType().get());
+keyElementGetter =
+
ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType());
+valueElementGetter =
+
ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType());
+}
+
+public @Nullable MapData eval(@Nullable MapData map1, @Nullable MapData 
map2) {
+if (map1 == null || map2 == null) {
+return null;
+}
+MapDataForMapUnion resultMap = new MapDataForMapUnion(map1, map2);
+return resultMap;
+}
+
+private class MapDataForMapUnion implements MapData {
+private ArrayData keysArray;
+private ArrayData valuesArray;
+private final Map map;
+
+public MapDataForMapUnion(MapData map1, MapData map2) {
+
+this.map = new HashMap<>();

Review Comment:
   `HashMap` under the hood uses java's `equals` which is not 100% same as SQL. 
   We need to use SQL with `equalityEvaluator` like for instance done for 
`ARRAY_DISTINCT`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


snuyanzin commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242773854


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CommonMapInputTypeStrategy.java:
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.utils.LogicalTypeMerging;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** An {@link InputTypeStrategy} that expects that all arguments have a common 
map type. */
+@Internal
+public final class CommonMapInputTypeStrategy implements InputTypeStrategy {

Review Comment:
   This class looks same as 
`org.apache.flink.table.types.inference.strategies.CommonInputTypeStrategy` 
(except class name).
   Why do we need it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


snuyanzin commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242769137


##
docs/data/sql_functions.yml:
##
@@ -649,6 +649,9 @@ collection:
   - sql: MAP_KEYS(map)
 table: MAP.mapKeys()
 description: Returns the keys of the map as array. No order guaranteed.
+SQL: MAP_UNION(map1, map2)
+Table: map1.mapUnion(map2)
+Description: Returns a map created by merging two maps, 'map1' and 'map2'. 
This two maps should have same data structure. If there are overlapping keys, 
the value from 'map2' will overwrite the value from 'map1'. If any of maps are 
null, return null.

Review Comment:
   ```suggestion
   Description: Returns a map created by merging two maps, 'map1' and 
'map2'. These two maps should have same data structure. If there are 
overlapping keys, the value from 'map2' will overwrite the value from 'map1'. 
If any of maps is null, return null.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


snuyanzin commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242768691


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java:
##
@@ -1423,6 +1424,16 @@ public OutType mapEntries() {
 return toApiSpecificExpression(unresolvedCall(MAP_ENTRIES, toExpr()));
 }
 
+/**
+ * Returns a map created by merging two maps, 'map1' and 'map2'. This two 
maps should have same

Review Comment:
   ```suggestion
* Returns a map created by merging two maps, 'map1' and 'map2'. These 
two maps should have same
   ```
   since it is plural



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


snuyanzin commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242769137


##
docs/data/sql_functions.yml:
##
@@ -649,6 +649,9 @@ collection:
   - sql: MAP_KEYS(map)
 table: MAP.mapKeys()
 description: Returns the keys of the map as array. No order guaranteed.
+SQL: MAP_UNION(map1, map2)
+Table: map1.mapUnion(map2)
+Description: Returns a map created by merging two maps, 'map1' and 'map2'. 
This two maps should have same data structure. If there are overlapping keys, 
the value from 'map2' will overwrite the value from 'map1'. If any of maps are 
null, return null.

Review Comment:
   ```suggestion
   Description: Returns a map created by merging two maps, 'map1' and 
'map2'. These two maps should have same data structure. If there are 
overlapping keys, the value from 'map2' will overwrite the value from 'map1'. 
If any of maps are null, return null.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


snuyanzin commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242767973


##
flink-python/pyflink/table/expression.py:
##
@@ -1528,6 +1528,18 @@ def map_keys(self) -> 'Expression':
 """
 return _unary_op("mapKeys")(self)
 
+@property
+def map_union(self, map2) -> 'Expression':
+"""
+Returns a map created by merging two maps, 'map1' and 'map2'.
+This two maps should have same data structure. If there are 
overlapping keys,

Review Comment:
   ```suggestion
   These two maps should have same data structure. If there are 
overlapping keys,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


snuyanzin commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242766634


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.KeyValueDataType;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */
+@Internal
+public class MapUnionFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter keyElementGetter;
+private final ArrayData.ElementGetter valueElementGetter;
+
+public MapUnionFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.MAP_UNION, context);
+KeyValueDataType outputType =
+((KeyValueDataType) 
context.getCallContext().getOutputDataType().get());
+keyElementGetter =
+
ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType());
+valueElementGetter =
+
ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType());
+}
+
+public @Nullable MapData eval(@Nullable MapData map1, @Nullable MapData 
map2) {
+if (map1 == null || map2 == null) {
+return null;
+}
+MapDataForMapUnion resultMap = new MapDataForMapUnion(map1, map2);
+return resultMap;
+}
+
+private class MapDataForMapUnion implements MapData {
+private ArrayData keysArray;
+private ArrayData valuesArray;

Review Comment:
   it could be final i guess



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


snuyanzin commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242765635


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.KeyValueDataType;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */
+@Internal
+public class MapUnionFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter keyElementGetter;
+private final ArrayData.ElementGetter valueElementGetter;
+
+public MapUnionFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.MAP_UNION, context);
+KeyValueDataType outputType =
+((KeyValueDataType) 
context.getCallContext().getOutputDataType().get());
+keyElementGetter =
+
ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType());
+valueElementGetter =
+
ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType());
+}
+
+public @Nullable MapData eval(@Nullable MapData map1, @Nullable MapData 
map2) {
+if (map1 == null || map2 == null) {
+return null;
+}
+MapDataForMapUnion resultMap = new MapDataForMapUnion(map1, map2);
+return resultMap;
+}
+
+private class MapDataForMapUnion implements MapData {
+private ArrayData keysArray;
+private ArrayData valuesArray;
+private final Map map;
+
+public MapDataForMapUnion(MapData map1, MapData map2) {
+
+this.map = new HashMap<>();
+for (int i = 0; i < map1.size(); i++) {
+final Object key = 
keyElementGetter.getElementOrNull(map1.keyArray(), i);
+final Object value = 
valueElementGetter.getElementOrNull(map1.valueArray(), i);
+this.map.put(key, value);
+}
+for (int i = 0; i < map2.size(); i++) {
+final Object key = 
keyElementGetter.getElementOrNull(map2.keyArray(), i);
+final Object value = 
valueElementGetter.getElementOrNull(map2.valueArray(), i);
+this.map.put(key, value);
+}
+this.updateArrays();
+}
+
+@Override
+public int size() {
+return map.size();
+}
+
+@Override
+public ArrayData keyArray() {
+return keysArray;
+}
+
+@Override
+public ArrayData valueArray() {
+return valuesArray;
+}
+
+private void updateArrays() {
+this.keysArray = new GenericArrayData(map.keySet().toArray());

Review Comment:
   do we need a separate method for that if it's used only once and contains 
only 2 lines?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-26 Thread via GitHub


snuyanzin commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1242763657


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.KeyValueDataType;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */
+@Internal
+public class MapUnionFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter keyElementGetter;
+private final ArrayData.ElementGetter valueElementGetter;
+
+public MapUnionFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.MAP_UNION, context);
+KeyValueDataType outputType =
+((KeyValueDataType) 
context.getCallContext().getOutputDataType().get());
+keyElementGetter =
+
ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType());
+valueElementGetter =
+
ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType());
+}
+
+public @Nullable MapData eval(@Nullable MapData map1, @Nullable MapData 
map2) {
+if (map1 == null || map2 == null) {
+return null;
+}
+MapDataForMapUnion resultMap = new MapDataForMapUnion(map1, map2);

Review Comment:
   no need for new map creation if one of the maps is empty



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on a diff in pull request #22850: [FLINK-28229][streaming-java] Source API alternatives for StreamExecutionEnvironment#fromCollection() methods

2023-06-26 Thread via GitHub


afedulov commented on code in PR #22850:
URL: https://github.com/apache/flink/pull/22850#discussion_r1242728328


##
flink-connectors/flink-connector-datagen-test/pom.xml:
##
@@ -0,0 +1,89 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connectors
+   1.18-SNAPSHOT
+   
+
+   flink-connector-datagen-tests

Review Comment:
   Since `TestCodeArchitectureTestBase` uses `ITCaseRules` I assume you intent 
to move it too? This would pull `flink-test-utils`, 
`flink-connector-test-utils` and `junit-jupiter-api` with it into 
`architecture-tests-base`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22876: [FLINK-24911][table] Enable line numbers in SQL Client

2023-06-26 Thread via GitHub


flinkbot commented on PR #22876:
URL: https://github.com/apache/flink/pull/22876#issuecomment-1608194141

   
   ## CI report:
   
   * 0947cee642edb4e04f01db7797bacedd21878b55 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-24911) Enable line numbers in SQL Client

2023-06-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-24911:
---
Labels: pull-request-available  (was: )

> Enable line numbers in SQL Client
> -
>
> Key: FLINK-24911
> URL: https://issues.apache.org/jira/browse/FLINK-24911
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Client
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> Should be enabled/disabled via property 
> {{sql-client.prompt.show-line-numbers}}
> Also  add widget to make it possible to toggle with a key-stroke



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] snuyanzin opened a new pull request, #22876: [FLINK-24911][table] Enable line numbers in SQL Client

2023-06-26 Thread via GitHub


snuyanzin opened a new pull request, #22876:
URL: https://github.com/apache/flink/pull/22876

   ## What is the purpose of the change
   
   The PR allows to have line numbers in SQL Client for multiline SQL.
   In fact line numbers are already implemented in JLine and the only thing the 
PR does is allowing to toggle line numbers via property.
   
   
   ## Verifying this change
   
   This change is a trivial rework.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: ( no )
 - The runtime per-record code paths (performance sensitive): ( no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no )
 - The S3 file system connector: ( no )
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes )
 - If yes, how is the feature documented? (docs )
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on pull request #22834: [FLINK-32260]-table-Add-ARRAY_SLICE-function

2023-06-26 Thread via GitHub


snuyanzin commented on PR #22834:
URL: https://github.com/apache/flink/pull/22834#issuecomment-1608174310

   Thanks for your contribution @hanyuzheng7 
   i left some comments
   besides that 
   1. There should be added doc to chinese version of doc
   2. The PR should be rebased to resolve conflicts
   3. I put some suggestions about `ARRAY_SLICE` behavior in corresponding JIRA 
issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-32260) Add SLICE support in SQL & Table API

2023-06-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737329#comment-17737329
 ] 

Sergey Nuyanzin edited comment on FLINK-32260 at 6/26/23 8:08 PM:
--

I would suggest to change behavior of the function a bit to be consistent with 
lots of other vendors.

1. Rename args. IMHO Google Spanner[1] has the best naming 
{{ARRAY_SLICE(array_to_slice, start_offset, end_offset)}}, other also have ok 
naming. The main issue is that the vendors I mentioned here support zero and 
negative third arg and in this case it is not clear what is negative length for 
instance.
2. Return {{NULL}} *+only+* if one of the input args is {{NULL}}. Otherwise 
return sliced array or empty array. This is the behavior of Google Spanner[1], 
Cosmos DB[2], ClickHouse[3], DuckDB[4], Snowflake[5].
3. It is ok to have the third arg 0 or negative, non-{{NULL}} value should be 
returned
4. Clickhouse[3] and Cosmos DB[2] have the third arg as optional meaning that 
if it is not specified the array will be sliced till the end (similar to 
{{substring}}). Itwould make sense to have this as well


[1] 
https://cloud.google.com/spanner/docs/reference/standard-sql/array_functions#array_slice
[2] https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/query/array-slice
[3] 
https://clickhouse.com/docs/en/sql-reference/functions/array-functions#arrayslice
[4] https://duckdb.org/docs/sql/functions/nested#list-functions
[5] https://docs.snowflake.com/en/sql-reference/functions/array_slice

P.S. for some of vendors this behavior is not specified explicitly in doc 
however it could be double checked against latest versions (I did it myself)


was (Author: sergey nuyanzin):
I would suggest to change behavior of the function a bit to be consistent with 
lots of other vendors.

1. Rename args. IMHO Google Spanner[1] has the best naming 
{{ARRAY_SLICE(array_to_slice, start_offset, end_offset)}}.
2. Return {{NULL}} *+only+* if one of the input args is {{NULL}}. Otherwise 
return sliced array or empty array. This is the behavior of Google Spanner[1], 
Cosmos DB[2], ClickHouse[3], DuckDB[4], Snowflake[5].
3. It is ok to have the third arg 0 or negative, non-{{NULL}} value should be 
returned
4. Clickhouse[3] and Cosmos DB[2] have the third arg as optional meaning that 
if it is not specified the array will be sliced till the end (similar to 
{{substring}}). Itwould make sense to have this as well


[1] 
https://cloud.google.com/spanner/docs/reference/standard-sql/array_functions#array_slice
[2] https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/query/array-slice
[3] 
https://clickhouse.com/docs/en/sql-reference/functions/array-functions#arrayslice
[4] https://duckdb.org/docs/sql/functions/nested#list-functions
[5] https://docs.snowflake.com/en/sql-reference/functions/array_slice

P.S. for some of vendors this behavior is not specified explicitly in doc 
however it could be double checked against latest versions (I did it myself)

> Add SLICE support in SQL & Table API
> 
>
> Key: FLINK-32260
> URL: https://issues.apache.org/jira/browse/FLINK-32260
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Bonnie Varghese
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Implement the {{array_slice}} function to extract a subset of elements from 
> an array.
> Description: The {{array_slice}} function in the ETL pipeline allows you to 
> extract a subset of elements from an array based on the specified starting 
> index and length. It supports both positive and negative indices, where 
> positive indices start from 1 (the first element) and negative indices start 
> from the end of the array (-1 being the last element).
> Syntax:
>  
> code
> {code:java}
> array_slice[x: array, start: int, length: int] -> array{code}
> {{ }}
> Arguments:
>  * {{{}x{}}}: The input array from which to extract the subset of elements.
>  * {{{}start{}}}: The starting index of the subset. If positive, it 
> represents the index from the beginning of the array. If negative, it 
> represents the index from the end of the array (-1 being the last element).
>  * {{{}length{}}}: The length of the subset to be extracted.
> Returns: An array containing the subset of elements extracted from the input 
> array {{{}x{}}}. The subset starts from the specified {{start}} index and has 
> the specified {{{}length{}}}.
> Examples:
>  # Extracting a subset from an array starting from index 2 with length 2:
>  
> {code:java}
> array_slice[array[1, 2, 3, 4], 2, 2]  Output: [2, 3]{code}
>      2. Extracting a subset from an array starting from the second-to-last 
> element with len

[jira] [Comment Edited] (FLINK-32260) Add SLICE support in SQL & Table API

2023-06-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737329#comment-17737329
 ] 

Sergey Nuyanzin edited comment on FLINK-32260 at 6/26/23 8:05 PM:
--

I would suggest to change behavior of the function a bit to be consistent with 
lots of other vendors.

1. Rename args. IMHO Google Spanner[1] has the best naming 
{{ARRAY_SLICE(array_to_slice, start_offset, end_offset)}}.
2. Return {{NULL}} *+only+* if one of the input args is {{NULL}}. Otherwise 
return sliced array or empty array. This is the behavior of Google Spanner[1], 
Cosmos DB[2], ClickHouse[3], DuckDB[4], Snowflake[5].
3. It is ok to have the third arg 0 or negative, non-{{NULL}} value should be 
returned
4. Clickhouse[3] and Cosmos DB[2] have the third arg as optional meaning that 
if it is not specified the array will be sliced till the end (similar to 
{{substring}}). Itwould make sense to have this as well


[1] 
https://cloud.google.com/spanner/docs/reference/standard-sql/array_functions#array_slice
[2] https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/query/array-slice
[3] 
https://clickhouse.com/docs/en/sql-reference/functions/array-functions#arrayslice
[4] https://duckdb.org/docs/sql/functions/nested#list-functions
[5] https://docs.snowflake.com/en/sql-reference/functions/array_slice

P.S. for some of vendors this behavior is not specified explicitly in doc 
however it could be double checked against latest versions (I did it myself)


was (Author: sergey nuyanzin):
I would suggest to change behavior of the function a bit to be consistent with 
lots of other vendors.

1. Rename args. IMHO Google Spanner[1] has the best naming 
{{ARRAY_SLICE(array_to_slice, start_offset, end_offset)}}.
2. Return {{NULL}} *+only+* if one of the input args is {{NULL}}. Otherwise 
return sliced array or empty array. This is the behavior of Google Spanner[1], 
Cosmos DB[2], ClickHouse[3], DuckDB[4], Snowflake[5].
3. It is ok to have the third arg 0 or negative.
4. Clickhouse[3] and Cosmos DB[2] have the third arg as optional meaning that 
if it is not specified the array will be sliced till the end (similar to 
{{substring}}). Itwould make sense to have this as well


[1] 
https://cloud.google.com/spanner/docs/reference/standard-sql/array_functions#array_slice
[2] https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/query/array-slice
[3] 
https://clickhouse.com/docs/en/sql-reference/functions/array-functions#arrayslice
[4] https://duckdb.org/docs/sql/functions/nested#list-functions
[5] https://docs.snowflake.com/en/sql-reference/functions/array_slice

P.S. for some of vendors this behavior is not specified explicitly in doc 
however it could be double checked against latest versions (I did it myself)

> Add SLICE support in SQL & Table API
> 
>
> Key: FLINK-32260
> URL: https://issues.apache.org/jira/browse/FLINK-32260
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Bonnie Varghese
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Implement the {{array_slice}} function to extract a subset of elements from 
> an array.
> Description: The {{array_slice}} function in the ETL pipeline allows you to 
> extract a subset of elements from an array based on the specified starting 
> index and length. It supports both positive and negative indices, where 
> positive indices start from 1 (the first element) and negative indices start 
> from the end of the array (-1 being the last element).
> Syntax:
>  
> code
> {code:java}
> array_slice[x: array, start: int, length: int] -> array{code}
> {{ }}
> Arguments:
>  * {{{}x{}}}: The input array from which to extract the subset of elements.
>  * {{{}start{}}}: The starting index of the subset. If positive, it 
> represents the index from the beginning of the array. If negative, it 
> represents the index from the end of the array (-1 being the last element).
>  * {{{}length{}}}: The length of the subset to be extracted.
> Returns: An array containing the subset of elements extracted from the input 
> array {{{}x{}}}. The subset starts from the specified {{start}} index and has 
> the specified {{{}length{}}}.
> Examples:
>  # Extracting a subset from an array starting from index 2 with length 2:
>  
> {code:java}
> array_slice[array[1, 2, 3, 4], 2, 2]  Output: [2, 3]{code}
>      2. Extracting a subset from an array starting from the second-to-last 
> element with length 
>  
> {code:java}
> array_slice[array[1, 2, 3, 4], -2, 2]
> Output: [3, 4]{code}
> see also:
> spark:[https://spark.apache.org/docs/latest/api/sql/index.html#slice]



--
This message was sent by Atlassian Jira
(v8.20.10#82

[jira] [Comment Edited] (FLINK-32260) Add SLICE support in SQL & Table API

2023-06-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737329#comment-17737329
 ] 

Sergey Nuyanzin edited comment on FLINK-32260 at 6/26/23 8:04 PM:
--

I would suggest to change behavior of the function a bit to be consistent with 
lots of other vendors.

1. Rename args. IMHO Google Spanner[1] has the best naming 
{{ARRAY_SLICE(array_to_slice, start_offset, end_offset)}}.
2. Return {{NULL}} *+only+* if one of the input args is {{NULL}}. Otherwise 
return sliced array or empty array. This is the behavior of Google Spanner[1], 
Cosmos DB[2], ClickHouse[3], DuckDB[4], Snowflake[5].
3. It is ok to have the third arg 0 or negative.
4. Clickhouse[3] and Cosmos DB[2] have the third arg as optional meaning that 
if it is not specified the array will be sliced till the end (similar to 
{{substring}}). Itwould make sense to have this as well


[1] 
https://cloud.google.com/spanner/docs/reference/standard-sql/array_functions#array_slice
[2] https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/query/array-slice
[3] 
https://clickhouse.com/docs/en/sql-reference/functions/array-functions#arrayslice
[4] https://duckdb.org/docs/sql/functions/nested#list-functions
[5] https://docs.snowflake.com/en/sql-reference/functions/array_slice

P.S. for some of vendors this behavior is not specified explicitly in doc 
however it could be double checked against latest versions (I did it myself)


was (Author: sergey nuyanzin):
I would suggest to change behavior of the function a bit to be consistent with 
lots of other vendors.

1. Rename args. IMHO Google Spanner[1] has the best naming 
{{ARRAY_SLICE(array_to_slice, start_offset, end_offset)}}.
2. Return {{NULL}} *+only+* if one of the input args is {{NULL}}. Otherwise 
return sliced array or empty array. This is the behavior of Google Spanner[1], 
Cosmos DB[2], ClickHouse[3], DuckDB[4], Snowflake[5].
3. Clickhouse[3] and Cosmos DB[2] have the third arg as optional meaning that 
if it is not specified the array will be sliced till the end (similar to 
{{substring}}). Itwould make sense to have this as well


[1] 
https://cloud.google.com/spanner/docs/reference/standard-sql/array_functions#array_slice
[2] https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/query/array-slice
[3] 
https://clickhouse.com/docs/en/sql-reference/functions/array-functions#arrayslice
[4] https://duckdb.org/docs/sql/functions/nested#list-functions
[5] https://docs.snowflake.com/en/sql-reference/functions/array_slice

P.S. for some of vendors this behavior is not specified explicitly in doc 
however it could be double checked against latest versions (I did it myself)

> Add SLICE support in SQL & Table API
> 
>
> Key: FLINK-32260
> URL: https://issues.apache.org/jira/browse/FLINK-32260
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Bonnie Varghese
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Implement the {{array_slice}} function to extract a subset of elements from 
> an array.
> Description: The {{array_slice}} function in the ETL pipeline allows you to 
> extract a subset of elements from an array based on the specified starting 
> index and length. It supports both positive and negative indices, where 
> positive indices start from 1 (the first element) and negative indices start 
> from the end of the array (-1 being the last element).
> Syntax:
>  
> code
> {code:java}
> array_slice[x: array, start: int, length: int] -> array{code}
> {{ }}
> Arguments:
>  * {{{}x{}}}: The input array from which to extract the subset of elements.
>  * {{{}start{}}}: The starting index of the subset. If positive, it 
> represents the index from the beginning of the array. If negative, it 
> represents the index from the end of the array (-1 being the last element).
>  * {{{}length{}}}: The length of the subset to be extracted.
> Returns: An array containing the subset of elements extracted from the input 
> array {{{}x{}}}. The subset starts from the specified {{start}} index and has 
> the specified {{{}length{}}}.
> Examples:
>  # Extracting a subset from an array starting from index 2 with length 2:
>  
> {code:java}
> array_slice[array[1, 2, 3, 4], 2, 2]  Output: [2, 3]{code}
>      2. Extracting a subset from an array starting from the second-to-last 
> element with length 
>  
> {code:java}
> array_slice[array[1, 2, 3, 4], -2, 2]
> Output: [3, 4]{code}
> see also:
> spark:[https://spark.apache.org/docs/latest/api/sql/index.html#slice]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32260) Add SLICE support in SQL & Table API

2023-06-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737329#comment-17737329
 ] 

Sergey Nuyanzin edited comment on FLINK-32260 at 6/26/23 8:02 PM:
--

I would suggest to change behavior of the function a bit to be consistent with 
lots of other vendors.

1. Rename args. IMHO Google Spanner[1] has the best naming 
{{ARRAY_SLICE(array_to_slice, start_offset, end_offset)}}.
2. Return {{NULL}} *+only+* if one of the input args is {{NULL}}. Otherwise 
return sliced array or empty array. This is the behavior of Google Spanner[1], 
Cosmos DB[2], ClickHouse[3], DuckDB[4], Snowflake[5].
3. Clickhouse[3] and Cosmos DB[2] have the third arg as optional meaning that 
if it is not specified the array will be sliced till the end (similar to 
{{substring}}). Itwould make sense to have this as well


[1] 
https://cloud.google.com/spanner/docs/reference/standard-sql/array_functions#array_slice
[2] https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/query/array-slice
[3] 
https://clickhouse.com/docs/en/sql-reference/functions/array-functions#arrayslice
[4] https://duckdb.org/docs/sql/functions/nested#list-functions
[5] https://docs.snowflake.com/en/sql-reference/functions/array_slice

P.S. for some of vendors this behavior is not specified explicitly in doc 
however it could be double checked against latest versions (I did it myself)


was (Author: sergey nuyanzin):
I would suggest to change behavior of the function a bit to be consistent with 
lots of other vendors.

1. Rename args. IMHO Google Spanner[1] has the best naming 
{{ARRAY_SLICE(array_to_slice, start_offset, end_offset)}}.
2. Return {{NULL}} *+only+* if one of the input args is {{NULL}}. Otherwise 
return sliced array or empty array. This is the behavior of Google Spanner[1], 
Cosmos DB[2], ClickHouse[3], DuckDB[4], Snowflake[5].
3. Clickhouse[3] and Cosmos DB[2] have the third arg as optional meaning that 
if it is not specified the array will be sliced till the end (similar to 
{{substring}}) 


[1] 
https://cloud.google.com/spanner/docs/reference/standard-sql/array_functions#array_slice
[2] https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/query/array-slice
[3] 
https://clickhouse.com/docs/en/sql-reference/functions/array-functions#arrayslice
[4] https://duckdb.org/docs/sql/functions/nested#list-functions
[5] https://docs.snowflake.com/en/sql-reference/functions/array_slice

P.S. for some of vendors this behavior is not specified explicitly in doc 
however it could be double checked against latest versions (I did it myself)

> Add SLICE support in SQL & Table API
> 
>
> Key: FLINK-32260
> URL: https://issues.apache.org/jira/browse/FLINK-32260
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Bonnie Varghese
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Implement the {{array_slice}} function to extract a subset of elements from 
> an array.
> Description: The {{array_slice}} function in the ETL pipeline allows you to 
> extract a subset of elements from an array based on the specified starting 
> index and length. It supports both positive and negative indices, where 
> positive indices start from 1 (the first element) and negative indices start 
> from the end of the array (-1 being the last element).
> Syntax:
>  
> code
> {code:java}
> array_slice[x: array, start: int, length: int] -> array{code}
> {{ }}
> Arguments:
>  * {{{}x{}}}: The input array from which to extract the subset of elements.
>  * {{{}start{}}}: The starting index of the subset. If positive, it 
> represents the index from the beginning of the array. If negative, it 
> represents the index from the end of the array (-1 being the last element).
>  * {{{}length{}}}: The length of the subset to be extracted.
> Returns: An array containing the subset of elements extracted from the input 
> array {{{}x{}}}. The subset starts from the specified {{start}} index and has 
> the specified {{{}length{}}}.
> Examples:
>  # Extracting a subset from an array starting from index 2 with length 2:
>  
> {code:java}
> array_slice[array[1, 2, 3, 4], 2, 2]  Output: [2, 3]{code}
>      2. Extracting a subset from an array starting from the second-to-last 
> element with length 
>  
> {code:java}
> array_slice[array[1, 2, 3, 4], -2, 2]
> Output: [3, 4]{code}
> see also:
> spark:[https://spark.apache.org/docs/latest/api/sql/index.html#slice]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32260) Add SLICE support in SQL & Table API

2023-06-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737329#comment-17737329
 ] 

Sergey Nuyanzin commented on FLINK-32260:
-

I would suggest to change behavior of the function a bit to be consistent with 
lots of other vendors.

1. Rename args. IMHO Google Spanner[1] has the best naming 
{{ARRAY_SLICE(array_to_slice, start_offset, end_offset)}}.
2. Return {{NULL}} *+only+* if one of the input args is {{NULL}}. Otherwise 
return sliced array or empty array. This is the behavior of Google Spanner[1], 
Cosmos DB[2], ClickHouse[3], DuckDB[4], Snowflake[5].
3. Clickhouse[3] and Cosmos DB[2] have the third arg as optional meaning that 
if it is not specified the array will be sliced till the end (similar to 
{{substring}}) 


[1] 
https://cloud.google.com/spanner/docs/reference/standard-sql/array_functions#array_slice
[2] https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/query/array-slice
[3] 
https://clickhouse.com/docs/en/sql-reference/functions/array-functions#arrayslice
[4] https://duckdb.org/docs/sql/functions/nested#list-functions
[5] https://docs.snowflake.com/en/sql-reference/functions/array_slice

P.S. for some of vendors this behavior is not specified explicitly in doc 
however it could be double checked against latest versions (I did it myself)

> Add SLICE support in SQL & Table API
> 
>
> Key: FLINK-32260
> URL: https://issues.apache.org/jira/browse/FLINK-32260
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Bonnie Varghese
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Implement the {{array_slice}} function to extract a subset of elements from 
> an array.
> Description: The {{array_slice}} function in the ETL pipeline allows you to 
> extract a subset of elements from an array based on the specified starting 
> index and length. It supports both positive and negative indices, where 
> positive indices start from 1 (the first element) and negative indices start 
> from the end of the array (-1 being the last element).
> Syntax:
>  
> code
> {code:java}
> array_slice[x: array, start: int, length: int] -> array{code}
> {{ }}
> Arguments:
>  * {{{}x{}}}: The input array from which to extract the subset of elements.
>  * {{{}start{}}}: The starting index of the subset. If positive, it 
> represents the index from the beginning of the array. If negative, it 
> represents the index from the end of the array (-1 being the last element).
>  * {{{}length{}}}: The length of the subset to be extracted.
> Returns: An array containing the subset of elements extracted from the input 
> array {{{}x{}}}. The subset starts from the specified {{start}} index and has 
> the specified {{{}length{}}}.
> Examples:
>  # Extracting a subset from an array starting from index 2 with length 2:
>  
> {code:java}
> array_slice[array[1, 2, 3, 4], 2, 2]  Output: [2, 3]{code}
>      2. Extracting a subset from an array starting from the second-to-last 
> element with length 
>  
> {code:java}
> array_slice[array[1, 2, 3, 4], -2, 2]
> Output: [3, 4]{code}
> see also:
> spark:[https://spark.apache.org/docs/latest/api/sql/index.html#slice]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] snuyanzin commented on a diff in pull request #22834: [FLINK-32260]-table-Add-ARRAY_SLICE-function

2023-06-26 Thread via GitHub


snuyanzin commented on code in PR #22834:
URL: https://github.com/apache/flink/pull/22834#discussion_r1242654328


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySliceFunction.java:
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_SLICE}. */
+@Internal
+public class ArraySliceFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+
+public ArraySliceFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.ARRAY_SLICE, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType();
+elementGetter = 
ArrayData.createElementGetter(dataType.getLogicalType());
+}
+
+public @Nullable ArrayData eval(ArrayData array, Integer start, Integer 
length) {
+try {
+if (array == null
+|| start == null
+|| length == null
+|| start == 0
+|| Math.abs(start) > array.size()
+|| length <= 0) {
+return null;
+}
+
+int startIndex = (start > 0) ? start - 1 : array.size() + start;
+int endIndex = Math.min(startIndex + length, array.size());
+
+if (startIndex >= endIndex) {
+return null;
+}
+

Review Comment:
   if `startIndex == 1 && endIndex >= array.size()` there should be a shortcut 
to return array itself without creation of extra objects



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22834: [FLINK-32260]-table-Add-ARRAY_SLICE-function

2023-06-26 Thread via GitHub


snuyanzin commented on code in PR #22834:
URL: https://github.com/apache/flink/pull/22834#discussion_r1242650686


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java:
##
@@ -417,6 +418,173 @@ private Stream arrayReverseTestCases() {
 DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.DATE();
 }
 
+private Stream arraySliceTestCases() {
+return Stream.of(
+
TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_REVERSE)

Review Comment:
   Don't we test `ARRAY_SLICE`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >