[jira] [Commented] (FLINK-32478) SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails

2023-06-29 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738887#comment-17738887
 ] 

Rui Fan commented on FLINK-32478:
-

I have submit 3 PRs(master, 1.16, 1.17) for FLINK-32495, their changed are 
same, and they will run CI 3 times, do you think it's ok? Or do you want to run 
3 times for master PR?

BTW, I have run 
`SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups` on my 
Local more than 1K times. They are fine now with FLINK-32495.

You can check the test at this 
[comment|https://issues.apache.org/jira/browse/FLINK-32495?focusedCommentId=17738873=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17738873].

> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails
> --
>
> Key: FLINK-32478
> URL: https://issues.apache.org/jira/browse/FLINK-32478
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.18.0, 1.16.3, 1.17.2
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails
>  
> Root cause: multiple sources share the same thread pool, and the second 
> source cannot start due to the first source closes the shared thread pool.
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50611=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8613



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32496) Sources with idleness and alignment always wait for alignment when part of multiple sources is idle

2023-06-29 Thread haishui (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738886#comment-17738886
 ] 

haishui commented on FLINK-32496:
-

A source will be marked idle if the source waits for alignment for a long time. 
Is this a bug?

> Sources with idleness and alignment always wait for alignment when part of 
> multiple sources is idle
> ---
>
> Key: FLINK-32496
> URL: https://issues.apache.org/jira/browse/FLINK-32496
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.16.2, 1.17.1
>Reporter: haishui
>Priority: Major
>
> Sources with idleness and alignment always wait for alignment when part of 
> multiple sources is idle.
> *Root cause:*
> In 
> [SourceOperator|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java],
>  `lastEmittedWatermark` is Long.MAX_VALUE if a source is idle.
> When other source is active, the `currentMaxDesiredWatermark` is less then 
> Long.MAX_VALUE.
> So the `shouldWaitForAlignment` method is always true for idle sources.
>  
> What's more, the source will become idle if a source wait for alignment for a 
> long time, which also should be considered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32496) Sources with idleness and alignment always wait for alignment when part of multiple sources is idle

2023-06-29 Thread haishui (Jira)
haishui created FLINK-32496:
---

 Summary: Sources with idleness and alignment always wait for 
alignment when part of multiple sources is idle
 Key: FLINK-32496
 URL: https://issues.apache.org/jira/browse/FLINK-32496
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.17.1, 1.16.2
Reporter: haishui


Sources with idleness and alignment always wait for alignment when part of 
multiple sources is idle.

*Root cause:*

In 
[SourceOperator|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java],
 `lastEmittedWatermark` is Long.MAX_VALUE if a source is idle.

When other source is active, the `currentMaxDesiredWatermark` is less then 
Long.MAX_VALUE.

So the `shouldWaitForAlignment` method is always true for idle sources.

 

What's more, the source will become idle if a source wait for alignment for a 
long time, which also should be considered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32478) SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails

2023-06-29 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738884#comment-17738884
 ] 

Sergey Nuyanzin edited comment on FLINK-32478 at 6/30/23 5:34 AM:
--

[~fanrui] thanks for working on this

I wonder whether it is possible to retry ci (for the same commit in PR) 2-3 
times in a row to be a bit more sure that this or new related issue doesn't 
appear?


was (Author: sergey nuyanzin):
[~fanrui]thanks for working on this

I wonder whether it is possible to retry ci (for the same commit in PR) 2-3 
times in a row to be a bit more sure that this or new related issue doesn't 
appear?

> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails
> --
>
> Key: FLINK-32478
> URL: https://issues.apache.org/jira/browse/FLINK-32478
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.18.0, 1.16.3, 1.17.2
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails
>  
> Root cause: multiple sources share the same thread pool, and the second 
> source cannot start due to the first source closes the shared thread pool.
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50611=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8613



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32478) SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails

2023-06-29 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738884#comment-17738884
 ] 

Sergey Nuyanzin edited comment on FLINK-32478 at 6/30/23 5:34 AM:
--

[~fanrui]thanks for working on this

I wonder whether it is possible to retry ci (for the same commit in PR) 2-3 
times in a row to be a bit more sure that this or new related issue doesn't 
appear?


was (Author: sergey nuyanzin):
[~fanrui]thanks for working on this

I wonder whether it is possible to retry ci (for the same commit in PR) 2-3 
times in a row to be a bit more sure that this or new related issue doesn't 
appear

> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails
> --
>
> Key: FLINK-32478
> URL: https://issues.apache.org/jira/browse/FLINK-32478
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.18.0, 1.16.3, 1.17.2
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails
>  
> Root cause: multiple sources share the same thread pool, and the second 
> source cannot start due to the first source closes the shared thread pool.
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50611=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8613



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32478) SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails

2023-06-29 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738884#comment-17738884
 ] 

Sergey Nuyanzin commented on FLINK-32478:
-

[~fanrui]thanks for working on this

I wonder whether it is possible to retry ci (for the same commit in PR) 2-3 
times in a row to be a bit more sure that this or new related issue doesn't 
appear

> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails
> --
>
> Key: FLINK-32478
> URL: https://issues.apache.org/jira/browse/FLINK-32478
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.18.0, 1.16.3, 1.17.2
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails
>  
> Root cause: multiple sources share the same thread pool, and the second 
> source cannot start due to the first source closes the shared thread pool.
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50611=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8613



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32495) SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails

2023-06-29 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738873#comment-17738873
 ] 

Rui Fan commented on FLINK-32495:
-

Add a RepeatedTest(500) annotation for  
SourceCoordinatorAlignmentTest#testWatermarkAlignmentWithTwoGroups on my Local.

Master branch: Tests failed: 234, passed 266 of 500 test.

Master with this fix: Tests passed 500 of 500 test.

 

!image-2023-06-30-13-01-26-519.png|width=1139,height=256!

!image-2023-06-30-13-02-32-803.png|width=,height=245!

> SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails
> 
>
> Key: FLINK-32495
> URL: https://issues.apache.org/jira/browse/FLINK-32495
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2023-06-30-10-53-50-280.png, 
> image-2023-06-30-13-01-26-519.png, image-2023-06-30-13-02-32-803.png
>
>
> SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails.
> I analyzed this CI :  
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50668=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7=9089]
> h1. Root cause:
>  * The CoordinatorExecutorThreadFactory cannot new multiple threads. And too 
> many callers will check 
> `coordinatorThreadFactory.isCurrentThreadCoordinatorThread()`, such as: 
> SourceCoordinatorContext.attemptReady.
>  * The CoordinatorExecutorThreadFactory is shared at 
> [SourceCoordinatorTestBase|https://github.com/apache/flink/blob/21eba4ca4cb235a2189c94cdbf3abcec5cde1e6e/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java#L68]
>  * It will be used at multiple source coordinator, and the second source 
> coordinator will overwrite the CoordinatorExecutorThreadFactory#t, so the 
> check will fail for the first source.
> h1. Solution:
> Don't share the CoordinatorExecutorThreadFactory.
> h1. log:
> !image-2023-06-30-10-53-50-280.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32495) SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails

2023-06-29 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-32495:

Attachment: image-2023-06-30-13-01-26-519.png

> SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails
> 
>
> Key: FLINK-32495
> URL: https://issues.apache.org/jira/browse/FLINK-32495
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2023-06-30-10-53-50-280.png, 
> image-2023-06-30-13-01-26-519.png
>
>
> SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails.
> I analyzed this CI :  
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50668=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7=9089]
> h1. Root cause:
>  * The CoordinatorExecutorThreadFactory cannot new multiple threads. And too 
> many callers will check 
> `coordinatorThreadFactory.isCurrentThreadCoordinatorThread()`, such as: 
> SourceCoordinatorContext.attemptReady.
>  * The CoordinatorExecutorThreadFactory is shared at 
> [SourceCoordinatorTestBase|https://github.com/apache/flink/blob/21eba4ca4cb235a2189c94cdbf3abcec5cde1e6e/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java#L68]
>  * It will be used at multiple source coordinator, and the second source 
> coordinator will overwrite the CoordinatorExecutorThreadFactory#t, so the 
> check will fail for the first source.
> h1. Solution:
> Don't share the CoordinatorExecutorThreadFactory.
> h1. log:
> !image-2023-06-30-10-53-50-280.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32495) SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails

2023-06-29 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-32495:

Attachment: image-2023-06-30-13-02-32-803.png

> SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails
> 
>
> Key: FLINK-32495
> URL: https://issues.apache.org/jira/browse/FLINK-32495
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2023-06-30-10-53-50-280.png, 
> image-2023-06-30-13-01-26-519.png, image-2023-06-30-13-02-32-803.png
>
>
> SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails.
> I analyzed this CI :  
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50668=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7=9089]
> h1. Root cause:
>  * The CoordinatorExecutorThreadFactory cannot new multiple threads. And too 
> many callers will check 
> `coordinatorThreadFactory.isCurrentThreadCoordinatorThread()`, such as: 
> SourceCoordinatorContext.attemptReady.
>  * The CoordinatorExecutorThreadFactory is shared at 
> [SourceCoordinatorTestBase|https://github.com/apache/flink/blob/21eba4ca4cb235a2189c94cdbf3abcec5cde1e6e/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java#L68]
>  * It will be used at multiple source coordinator, and the second source 
> coordinator will overwrite the CoordinatorExecutorThreadFactory#t, so the 
> check will fail for the first source.
> h1. Solution:
> Don't share the CoordinatorExecutorThreadFactory.
> h1. log:
> !image-2023-06-30-10-53-50-280.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32461) manage union operator state increase very large in Jobmanager

2023-06-29 Thread wgcn (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wgcn updated FLINK-32461:
-
Summary: manage  union operator state increase very large in Jobmanager   
(was: manage operator state increase very large )

> manage  union operator state increase very large in Jobmanager 
> ---
>
> Key: FLINK-32461
> URL: https://issues.apache.org/jira/browse/FLINK-32461
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.17.1
>Reporter: wgcn
>Priority: Major
> Attachments: image-2023-06-28-16-24-11-538.png, screenshot-1.png
>
>
> This issue doesn't usually occur, but it happens during busy nights when the 
> machines are more active. The "manage operator state" will increase 
> significantly, and I found the number of  operator union state object is 128 
> ,same with the parallelism .Whether the union state only needs to be loaded 
> once?
>  !screenshot-1.png! 
>  !image-2023-06-28-16-24-11-538.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-32461) manage operator state increase very large

2023-06-29 Thread wgcn (Jira)


[ https://issues.apache.org/jira/browse/FLINK-32461 ]


wgcn deleted comment on FLINK-32461:
--

was (Author: 1026688210):
>> 
 This issue doesn't usually occur, but it happens during busy nights when 
the machines are more active. The "manage operator state" will increase 
significantly, and I can see that it stores Kafka offsets inside mostly.

The task  only has 2 topic,and I haven't figured out why the state is so 
large.

> manage operator state increase very large 
> --
>
> Key: FLINK-32461
> URL: https://issues.apache.org/jira/browse/FLINK-32461
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.17.1
>Reporter: wgcn
>Priority: Major
> Attachments: image-2023-06-28-16-24-11-538.png, screenshot-1.png
>
>
> This issue doesn't usually occur, but it happens during busy nights when the 
> machines are more active. The "manage operator state" will increase 
> significantly, and I found the number of  operator union state object is 128 
> ,same with the parallelism .Whether the union state only needs to be loaded 
> once?
>  !screenshot-1.png! 
>  !image-2023-06-28-16-24-11-538.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32461) manage operator state increase very large

2023-06-29 Thread wgcn (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wgcn updated FLINK-32461:
-
Description: 
This issue doesn't usually occur, but it happens during busy nights when the 
machines are more active. The "manage operator state" will increase 
significantly, and I found the number of  operator union state object is 128 
,same with the parallelism .Whether the union state only needs to be loaded 
once?

 !screenshot-1.png! 
 !image-2023-06-28-16-24-11-538.png! 


  was:
This issue doesn't usually occur, but it happens during busy nights when the 
machines are more active. The "manage operator state" will increase 
significantly, and I found the number of  operator union state object is 128 
,same with the parallelism .Whether the union state only needs to be loaded 
once?
 !screenshot-1.png! 
 !image-2023-06-28-16-24-11-538.png! 



> manage operator state increase very large 
> --
>
> Key: FLINK-32461
> URL: https://issues.apache.org/jira/browse/FLINK-32461
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.17.1
>Reporter: wgcn
>Priority: Major
> Attachments: image-2023-06-28-16-24-11-538.png, screenshot-1.png
>
>
> This issue doesn't usually occur, but it happens during busy nights when the 
> machines are more active. The "manage operator state" will increase 
> significantly, and I found the number of  operator union state object is 128 
> ,same with the parallelism .Whether the union state only needs to be loaded 
> once?
>  !screenshot-1.png! 
>  !image-2023-06-28-16-24-11-538.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32461) manage operator state increase very large

2023-06-29 Thread wgcn (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wgcn updated FLINK-32461:
-
Description: 
This issue doesn't usually occur, but it happens during busy nights when the 
machines are more active. The "manage operator state" will increase 
significantly, and I found the number of  operator union state object is 128 
,same with the parallelism .Whether the union state only needs to be loaded 
once?
 !screenshot-1.png! 
 !image-2023-06-28-16-24-11-538.png! 


  was:
This issue doesn't usually occur, but it happens during busy nights when the 
machines are more active. The "manage operator state" will increase 
significantly, and I can see that it stores Kafka offsets inside mostly.
 !screenshot-1.png! 
 !image-2023-06-28-16-24-11-538.png! 



> manage operator state increase very large 
> --
>
> Key: FLINK-32461
> URL: https://issues.apache.org/jira/browse/FLINK-32461
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.17.1
>Reporter: wgcn
>Priority: Major
> Attachments: image-2023-06-28-16-24-11-538.png, screenshot-1.png
>
>
> This issue doesn't usually occur, but it happens during busy nights when the 
> machines are more active. The "manage operator state" will increase 
> significantly, and I found the number of  operator union state object is 128 
> ,same with the parallelism .Whether the union state only needs to be loaded 
> once?
>  !screenshot-1.png! 
>  !image-2023-06-28-16-24-11-538.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] luoyuxia commented on a diff in pull request #22805: [FLINK-32365][orc]get orc table statistics in parallel

2023-06-29 Thread via GitHub


luoyuxia commented on code in PR #22805:
URL: https://github.com/apache/flink/pull/22805#discussion_r1247405915


##
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/util/OrcFormatStatisticsReportUtil.java:
##
@@ -54,15 +59,31 @@ public class OrcFormatStatisticsReportUtil {
 
 public static TableStats getTableStatistics(
 List files, DataType producedDataType, Configuration 
hadoopConfig) {
+return getTableStatistics(
+files, producedDataType, hadoopConfig, 
Runtime.getRuntime().availableProcessors());
+}
+
+public static TableStats getTableStatistics(
+List files,
+DataType producedDataType,
+Configuration hadoopConfig,
+int statisticsThreadNum) {
 try {
 long rowCount = 0;
 Map columnStatisticsMap = new 
HashMap<>();
 RowType producedRowType = (RowType) 
producedDataType.getLogicalType();
+
+ExecutorService executorService = 
Executors.newFixedThreadPool(statisticsThreadNum);

Review Comment:
   ```
   Executors.newFixedThreadPool(
   statisticsThreadNum,
   new 
ExecutorThreadFactory("orc-get-table-statistic-worker"));
   ```
   ?



##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java:
##
@@ -373,13 +374,18 @@ private TableStats getMapRedInputFormatStatistics(
 .toLowerCase();
 List files =
 
inputSplits.stream().map(FileSourceSplit::path).collect(Collectors.toList());
+int statisticsThreadNum = 
flinkConf.get(TABLE_EXEC_HIVE_READ_FORMAT_STATISTICS_THREAD_NUM);

Review Comment:
   Check the thread num is not less than 1;



##
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetFormatStatisticsReportUtil.java:
##
@@ -70,12 +74,34 @@ public static TableStats getTableStatistics(
 DataType producedDataType,
 Configuration hadoopConfig,
 boolean isUtcTimestamp) {
+return getTableStatistics(
+files,
+producedDataType,
+hadoopConfig,
+isUtcTimestamp,
+Runtime.getRuntime().availableProcessors());
+}
+
+public static TableStats getTableStatistics(
+List files,
+DataType producedDataType,
+Configuration hadoopConfig,
+boolean isUtcTimestamp,
+int statisticsThreadNum) {
 try {
 Map> columnStatisticsMap = new HashMap<>();
 RowType producedRowType = (RowType) 
producedDataType.getLogicalType();
+ExecutorService executorService = 
Executors.newFixedThreadPool(statisticsThreadNum);

Review Comment:
   dito



##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java:
##
@@ -134,6 +134,13 @@ public class HiveOptions {
 + " custom: use policy class to create a 
commit policy."
 + " Support to configure multiple 
policies: 'metastore,success-file'.");
 
+public static final ConfigOption 
TABLE_EXEC_HIVE_READ_FORMAT_STATISTICS_THREAD_NUM =

Review Comment:
   Please remeber to add the doc for the newly added option.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22914: [FLINK-32281][table-planner] Enable two-phase HashAgg default when all aggregate functions in query support adaptive local HashAgg

2023-06-29 Thread via GitHub


flinkbot commented on PR #22914:
URL: https://github.com/apache/flink/pull/22914#issuecomment-1614094241

   
   ## CI report:
   
   * d25232d1bb3fc7a1e0e170b9bc40845db923bd1c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22804: [FLINK-31644][network] Implement the disk tier producer for the tiered storage

2023-06-29 Thread via GitHub


reswqa commented on code in PR #22804:
URL: https://github.com/apache/flink/pull/22804#discussion_r1247405254


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/SubpartitionDiskCacheManagerTest.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.EndOfSegmentEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+
+/** Tests for {@link SubpartitionDiskCacheManager}. */
+class SubpartitionDiskCacheManagerTest {
+
+@Test
+void testAppendBuffer() {
+int bufferBytes = 5;
+SubpartitionDiskCacheManager subpartitionDiskCacheManager =
+new SubpartitionDiskCacheManager();
+Buffer buffer = BufferBuilderTestUtils.buildSomeBuffer(bufferBytes);
+subpartitionDiskCacheManager.append(buffer);
+List> bufferAndIndexes =
+subpartitionDiskCacheManager.removeAllBuffers();
+
+assertThat(bufferAndIndexes).hasSize(1);
+assertThat(bufferAndIndexes.get(0)).isNotNull();
+
assertThat(bufferAndIndexes.get(0).f0.readableBytes()).isEqualTo(bufferBytes);
+}
+
+@Test
+void testAppendEndOfSegmentRecord() throws IOException {
+SubpartitionDiskCacheManager subpartitionDiskCacheManager =
+new SubpartitionDiskCacheManager();
+subpartitionDiskCacheManager.appendEndOfSegmentEvent(
+EventSerializer.toSerializedEvent(EndOfSegmentEvent.INSTANCE),
+Buffer.DataType.END_OF_SEGMENT);
+List> bufferAndIndexes =
+subpartitionDiskCacheManager.removeAllBuffers();
+
+assertThat(bufferAndIndexes).hasSize(1);
+assertThat(bufferAndIndexes.get(0)).isNotNull();
+Buffer buffer = bufferAndIndexes.get(0).f0;
+assertThat(buffer.isBuffer()).isFalse();
+AbstractEvent event =
+EventSerializer.fromSerializedEvent(
+buffer.readOnlySlice().getNioBufferReadable(), 
getClass().getClassLoader());
+assertThat(event).isInstanceOf(EndOfSegmentEvent.class);
+}
+
+@Test
+void testRelease() {
+SubpartitionDiskCacheManager subpartitionDiskCacheManager =
+new SubpartitionDiskCacheManager();
+
subpartitionDiskCacheManager.append(BufferBuilderTestUtils.buildSomeBuffer());
+
assertThatNoException().isThrownBy(subpartitionDiskCacheManager::release);

Review Comment:
   We'd better check this specific buffer is recycled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs commented on pull request #22882: [FLINK-32370][runtime] Don't print error log which will cause sql gateway e2e test to fail due to the .out file is not empty

2023-06-29 Thread via GitHub


FangYongs commented on PR #22882:
URL: https://github.com/apache/flink/pull/22882#issuecomment-1614089480

   Thanks @zentol , sorry for my insufficient understanding of e2e test and 
it's cool about your suggestion . I have updated this PR to ignore checking 
exceptions in jdbc driver e2e test which will check the results in the test 
itself, could you have a look again? Thanks so mush


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32281) Enable two-phase HashAgg default when agg function support adaptive local HashAgg

2023-06-29 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32281:
---
Labels: pull-request-available  (was: )

> Enable two-phase HashAgg default when agg function support adaptive local 
> HashAgg
> -
>
> Key: FLINK-32281
> URL: https://issues.apache.org/jira/browse/FLINK-32281
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: dalongliu
>Priority: Major
>  Labels: pull-request-available
>
> For the HashAgg operator, planner currently prefers a one-phase agg when the 
> statistic cannot be accurately estimated. In some queries of production 
> scenarios, it may be more reasonable to choose a two-phase agg. In the TPC-DS 
> cases, we find that for some patterns actually choosing two-stage agg, the 
> query runtime is significantly reduced. In 
> https://issues.apache.org/jira/browse/FLINK-30542 , we have introduced the 
> adaptive local hashagg, which can adaptively skip aggregation when the local 
> phase aggregation degree is relatively low, which can greatly improve the 
> performance of two-phase aggregation in some queries. Based on the above 
> background, in this issue, we propose to turn on two-phase agg by default for 
> functions that support adaptive local hashagg, such as sum/count/min/max, 
> etc., so as to exploit the ability of adpative local hashgg to improve the 
> performance of agg query. For OFCG, if we turn on two-phaseagg by default, we 
> can also let the local agg operator be put into the fused operator, so as to 
> enjoy the benefit from OFCG.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] lsyldliu opened a new pull request, #22914: [FLINK-32281][table-planner] Enable two-phase HashAgg default when all aggregate functions in query support adaptive local HashAgg

2023-06-29 Thread via GitHub


lsyldliu opened a new pull request, #22914:
URL: https://github.com/apache/flink/pull/22914

   ## What is the purpose of the change
   
   *For the HashAgg operator, planner currently prefers a one-phase agg when 
the statistic cannot be accurately estimated. In some queries of production 
scenarios, it may be more reasonable to choose a two-phase agg. In the TPC-DS 
cases, we find that for some patterns actually choosing two-stage agg, the 
query runtime is significantly reduced. In 
https://issues.apache.org/jira/browse/FLINK-30542 , we have introduced the 
adaptive local hashagg, which can adaptively skip aggregation when the local 
phase aggregation degree is relatively low, which can greatly improve the 
performance of two-phase aggregation in some queries. Based on the above 
background, in this issue, we propose to turn on two-phase agg by default for 
functions that support adaptive local hashagg, such as sum/count/min/max, etc., 
so as to exploit the ability of adpative local hashgg to improve the 
performance of agg query. For OFCG, if we turn on two-phaseagg by default, we 
can also let the local agg operator be put in
 to the fused operator, so as to enjoy the benefit from OFCG.*
   
   
   ## Brief change log
   
 - *Enable two-phase HashAgg default when all aggregate functions in query 
support adaptive local HashAgg*
   
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xintongsong commented on a diff in pull request #22807: [FLINK-31645][network] Introduce the PartitionFileReader and DiskIOSchduler

2023-06-29 Thread via GitHub


xintongsong commented on code in PR #22807:
URL: https://github.com/apache/flink/pull/22807#discussion_r1247388802


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.positionToNextBuffer;
+import static 
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.readFromByteChannel;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFileIndex.Region;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The implementation of {@link PartitionFileReader} with producer-merge mode. 
In this mode, the
+ * shuffle data is written in the producer side, the consumer side need to 
read multiple producers
+ * to get its partition data.
+ *
+ * Note that one partition file may contain the data of multiple 
subpartitions.
+ */
+public class ProducerMergedPartitionFileReader implements PartitionFileReader {
+
+/**
+ * Max number of caches.
+ *
+ * The constant defines the maximum number of caches that can be 
created. Its value is set to
+ * 1, which is considered sufficient for most parallel jobs. Each 
cache only contains
+ * references and numerical variables and occupies a minimal amount of 
memory so the value is
+ * not excessively large.
+ */
+private static final int MAX_CACHE_NUM = 1;
+
+/**
+ * Buffer offset caches stored in map.
+ *
+ * The key is the combination of {@link TieredStorageSubpartitionId} 
and buffer index. The
+ * value is the buffer offset cache, which includes file offset of the 
buffer index, the region
+ * containing the buffer index and next buffer index to consume.
+ */
+private final Map, 
BufferOffsetCache>
+bufferOffsetCaches;
+
+private final ByteBuffer reusedHeaderBuffer = 
BufferReaderWriterUtil.allocatedHeaderBuffer();
+
+private final Path dataFilePath;
+
+private final ProducerMergedPartitionFileIndex dataIndex;
+
+private FileChannel fileChannel;
+
+/** The current number of caches. */
+private int numCaches;
+
+ProducerMergedPartitionFileReader(
+Path dataFilePath, ProducerMergedPartitionFileIndex dataIndex) {
+this.dataFilePath = dataFilePath;
+this.dataIndex = dataIndex;
+this.bufferOffsetCaches = new HashMap<>();
+}
+
+@Override
+public Buffer readBuffer(
+TieredStoragePartitionId partitionId,
+TieredStorageSubpartitionId subpartitionId,
+int segmentId,
+int bufferIndex,
+MemorySegment memorySegment,
+BufferRecycler recycler)
+throws IOException {
+
+lazyInitializeFileChannel();
+Tuple2 cacheKey =
+Tuple2.of(subpartitionId, bufferIndex);
+Optional cache = tryGetCache(cacheKey);
+if (!cache.isPresent()) {
+return null;
+}
+fileChannel.position(cache.get().getFileOffset());
+Buffer buffer =
+readFromByteChannel(fileChannel, 

[GitHub] [flink] flinkbot commented on pull request #22913: [FLINK-31843][runtime] Improve performance of SlotSelectionStrategy by bulk select.

2023-06-29 Thread via GitHub


flinkbot commented on PR #22913:
URL: https://github.com/apache/flink/pull/22913#issuecomment-1614080815

   
   ## CI report:
   
   * e6a9390c48c5954f1480e3cb4c5c6d9d02e8fe56 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22804: [FLINK-31644][network] Implement the disk tier producer for the tiered storage

2023-06-29 Thread via GitHub


reswqa commented on code in PR #22804:
URL: https://github.com/apache/flink/pull/22804#discussion_r1247380917


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java:
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The {@link DiskCacheManager} is responsible for managing cached buffers 
before flushing to files.
+ */
+class DiskCacheManager {
+
+private final TieredStoragePartitionId partitionId;
+
+private final int numSubpartitions;
+
+private final PartitionFileWriter partitionFileWriter;
+
+private final SubpartitionDiskCacheManager[] subpartitionCacheManagers;
+
+/** Whether the current flush process has completed. */
+private CompletableFuture hasFlushCompleted;
+
+DiskCacheManager(
+TieredStoragePartitionId partitionId,
+int numSubpartitions,
+TieredStorageMemoryManager storageMemoryManager,
+PartitionFileWriter partitionFileWriter) {
+this.partitionId = partitionId;
+this.numSubpartitions = numSubpartitions;
+this.partitionFileWriter = partitionFileWriter;
+this.subpartitionCacheManagers = new 
SubpartitionDiskCacheManager[numSubpartitions];
+this.hasFlushCompleted = FutureUtils.completedVoidFuture();
+
+for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
++subpartitionId) {
+subpartitionCacheManagers[subpartitionId] = new 
SubpartitionDiskCacheManager();
+}
+
storageMemoryManager.listenBufferReclaimRequest(this::notifyFlushCachedBuffers);
+}
+
+// 
+//  Called by DiskTierProducerAgent
+// 
+
+/**
+ * Append buffer to {@link DiskCacheManager}.
+ *
+ * @param buffer to be managed by this class.
+ * @param subpartitionId the subpartition of this record.
+ */
+void append(Buffer buffer, int subpartitionId) {
+subpartitionCacheManagers[subpartitionId].append(buffer);
+}
+
+/**
+ * Append the end-of-segment event to {@link DiskCacheManager}, which 
indicates the segment has
+ * finished.
+ *
+ * @param record the end-of-segment event
+ * @param subpartitionId target subpartition of this record.
+ * @param dataType the type of this record. In other words, is it data or 
event.
+ */
+void appendEndOfSegmentEvent(ByteBuffer record, int subpartitionId, 
Buffer.DataType dataType) {

Review Comment:
   What confuses me is: this method called `appendEndOfSegmentEvent`, Isn't 
this implying that it must be an event? Is it possible that it will be `Data` 
in the future? 
   
   If we have a method called `emitEvent (Buffer event)` in a class `A`, I 
don't think this can be considered as `A` exactly aware the specific event.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31843) Select slots from SlotPool#freeSlots in bulk

2023-06-29 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31843:
---
Labels: pull-request-available  (was: )

> Select slots from SlotPool#freeSlots in bulk
> 
>
> Key: FLINK-31843
> URL: https://issues.apache.org/jira/browse/FLINK-31843
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Weihua Hu
>Priority: Major
>  Labels: pull-request-available
>
> we should also reduce the number of calls of "getFreeSlotInformations". In 
> current implementation, the scheduler will batch request slots for tasks in 
> the same pipeline region(ExecutionSlotAllocator#allocateSlotsFor), but the 
> slot allocator will process these requests one by one, and call 
> "getFreeSlotInformations" once for each request.
> We can optimize it to call "getFreeSlotInformations" once for a bulk (of slot 
> requests), instead of once for each slot request. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] huwh opened a new pull request, #22913: [FLINK-31843][runtime] Improve performance of SlotSelectionStrategy by bulk select.

2023-06-29 Thread via GitHub


huwh opened a new pull request, #22913:
URL: https://github.com/apache/flink/pull/22913

   ## What is the purpose of the change
   Improving the performance of the SlotSelectionStrategy by bulk selecting 
tasks in the same region.
   
   
   ## Brief change log
 - *Introduce FreeSlotCandidatesManager to manage available slots for 
candidates*
 - *Migerate SlotInfoWithUtilization to FreeSlotCandidatesManager for 
SlotSelectionStrategy*
 - *PhysicalSlotProvider support bulk select available slots*
   
   
   ## Verifying this change
 - *Added unit tests for FreeSlotCandidatesManager*
 - *Manually verified th change by wordcount job with 2 parallelism*
 - *Compare the benchmark of handleGlobalFailureAndRestartAllTasks*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  (no)
 - The serializers:  (no)
 - The runtime per-record code paths (performance sensitive):  (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature?  (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22804: [FLINK-31644][network] Implement the disk tier producer for the tiered storage

2023-06-29 Thread via GitHub


reswqa commented on code in PR #22804:
URL: https://github.com/apache/flink/pull/22804#discussion_r1247380917


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java:
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The {@link DiskCacheManager} is responsible for managing cached buffers 
before flushing to files.
+ */
+class DiskCacheManager {
+
+private final TieredStoragePartitionId partitionId;
+
+private final int numSubpartitions;
+
+private final PartitionFileWriter partitionFileWriter;
+
+private final SubpartitionDiskCacheManager[] subpartitionCacheManagers;
+
+/** Whether the current flush process has completed. */
+private CompletableFuture hasFlushCompleted;
+
+DiskCacheManager(
+TieredStoragePartitionId partitionId,
+int numSubpartitions,
+TieredStorageMemoryManager storageMemoryManager,
+PartitionFileWriter partitionFileWriter) {
+this.partitionId = partitionId;
+this.numSubpartitions = numSubpartitions;
+this.partitionFileWriter = partitionFileWriter;
+this.subpartitionCacheManagers = new 
SubpartitionDiskCacheManager[numSubpartitions];
+this.hasFlushCompleted = FutureUtils.completedVoidFuture();
+
+for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
++subpartitionId) {
+subpartitionCacheManagers[subpartitionId] = new 
SubpartitionDiskCacheManager();
+}
+
storageMemoryManager.listenBufferReclaimRequest(this::notifyFlushCachedBuffers);
+}
+
+// 
+//  Called by DiskTierProducerAgent
+// 
+
+/**
+ * Append buffer to {@link DiskCacheManager}.
+ *
+ * @param buffer to be managed by this class.
+ * @param subpartitionId the subpartition of this record.
+ */
+void append(Buffer buffer, int subpartitionId) {
+subpartitionCacheManagers[subpartitionId].append(buffer);
+}
+
+/**
+ * Append the end-of-segment event to {@link DiskCacheManager}, which 
indicates the segment has
+ * finished.
+ *
+ * @param record the end-of-segment event
+ * @param subpartitionId target subpartition of this record.
+ * @param dataType the type of this record. In other words, is it data or 
event.
+ */
+void appendEndOfSegmentEvent(ByteBuffer record, int subpartitionId, 
Buffer.DataType dataType) {

Review Comment:
   What confuses me is: this method called `appendEndOfSegmentEvent`, Isn't 
this implying that it must be an event? Is it possible that it will be `Data` 
in the future? 
   
   If we have a method called `emitEvent (Buffer event)` in a class `A`, I 
don't think this can be considered as `A` exactly aware the specific event.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] curcur commented on pull request #22905: [FLINK-32485][state-backends] Support build flink-state-backend-changelog test-jar

2023-06-29 Thread via GitHub


curcur commented on PR #22905:
URL: https://github.com/apache/flink/pull/22905#issuecomment-1614068423

   "test-jar" is usually used to share test code between modules.
   
   If I remember correctly, there should not be any other module sharing the 
test code of the changelog (It has been changed quite a lot, so I may make a 
mistake).
   
   Do you know where `ChangelogStateBackendTestUtils` is used (other than 
flink-statebackend-changelog)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-32485) Flink State Backend Changelog should support build test-jar

2023-06-29 Thread Yuan Mei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738861#comment-17738861
 ] 

Yuan Mei edited comment on FLINK-32485 at 6/30/23 3:34 AM:
---

I can take a look

 

"test-jar" is usually used to share test code between modules.

If I remember correctly, there should not be any other module sharing the test 
code of the changelog (It has been changed quite a lot, so I may make a 
mistake).

Do you know where "ChangelogStateBackendTestUtils" is used (other than 
flink-statebackend-changelog)?


was (Author: ym):
I can take a look

 

"test-jar" is usually used to share test code between modules.

If I remember correctly, there should not be another module sharing the test 
code of the changelog (It has been changed quite a lot, so I may make a 
mistake).

Do you know where "ChangelogStateBackendTestUtils" is used (other than 
flink-statebackend-changelog)?

> Flink State Backend Changelog should support build test-jar
> ---
>
> Key: FLINK-32485
> URL: https://issues.apache.org/jira/browse/FLINK-32485
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.17.1
>Reporter: Ran Tao
>Priority: Major
>  Labels: pull-request-available
>
> In some scenarios, executing unit tests will report the following errors. In 
> fact, since flink-state-backend-changelog test contains some util classes, we 
> should build test jar like flink-rocks-db backend.
> {code:java}
> /Users/xxx/github/flink/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java:29:37
> java: Package org.apache.flink.changelog.fs not exist {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32485) Flink State Backend Changelog should support build test-jar

2023-06-29 Thread Yuan Mei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738861#comment-17738861
 ] 

Yuan Mei edited comment on FLINK-32485 at 6/30/23 3:34 AM:
---

I can take a look

 

"test-jar" is usually used to share test code between modules.

If I remember correctly, there should not be any other module sharing the test 
code of the changelog (It has been changed quite a lot, so I may make a 
mistake).

Do you know where `ChangelogStateBackendTestUtils` is used (other than 
flink-statebackend-changelog)?


was (Author: ym):
I can take a look

 

"test-jar" is usually used to share test code between modules.

If I remember correctly, there should not be any other module sharing the test 
code of the changelog (It has been changed quite a lot, so I may make a 
mistake).

Do you know where "ChangelogStateBackendTestUtils" is used (other than 
flink-statebackend-changelog)?

> Flink State Backend Changelog should support build test-jar
> ---
>
> Key: FLINK-32485
> URL: https://issues.apache.org/jira/browse/FLINK-32485
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.17.1
>Reporter: Ran Tao
>Priority: Major
>  Labels: pull-request-available
>
> In some scenarios, executing unit tests will report the following errors. In 
> fact, since flink-state-backend-changelog test contains some util classes, we 
> should build test jar like flink-rocks-db backend.
> {code:java}
> /Users/xxx/github/flink/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java:29:37
> java: Package org.apache.flink.changelog.fs not exist {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32485) Flink State Backend Changelog should support build test-jar

2023-06-29 Thread Yuan Mei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738861#comment-17738861
 ] 

Yuan Mei commented on FLINK-32485:
--

I can take a look

 

"test-jar" is usually used to share test code between modules.

If I remember correctly, there should not be another module sharing the test 
code of the changelog (It has been changed quite a lot, so I may make a 
mistake).

Do you know where "ChangelogStateBackendTestUtils" is used (other than 
flink-statebackend-changelog)?

> Flink State Backend Changelog should support build test-jar
> ---
>
> Key: FLINK-32485
> URL: https://issues.apache.org/jira/browse/FLINK-32485
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.17.1
>Reporter: Ran Tao
>Priority: Major
>  Labels: pull-request-available
>
> In some scenarios, executing unit tests will report the following errors. In 
> fact, since flink-state-backend-changelog test contains some util classes, we 
> should build test jar like flink-rocks-db backend.
> {code:java}
> /Users/xxx/github/flink/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java:29:37
> java: Package org.apache.flink.changelog.fs not exist {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32478) SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails

2023-06-29 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738858#comment-17738858
 ] 

Rui Fan commented on FLINK-32478:
-

Hi [~martijnvisser]  [~Sergey Nuyanzin] [~mapohl] 

Sorry it wasn't fully fixed, 
SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails now. I 
have analyzed it, and created FLINK-32495. Let's follow it there.

BTW, FLINK-32495 doesn't happen every time, I just run CI at master branch 
before merging. I merged them for master, release-1.16 and release-1.17 after 
the master CI passed.

I will submit multiple PRs in the future for each release branch, and check all 
CIs. Try to avoid it in the future.

Sorry again for this mistake.

> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails
> --
>
> Key: FLINK-32478
> URL: https://issues.apache.org/jira/browse/FLINK-32478
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.18.0, 1.16.3, 1.17.2
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails
>  
> Root cause: multiple sources share the same thread pool, and the second 
> source cannot start due to the first source closes the shared thread pool.
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50611=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8613



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] luoyuxia commented on a diff in pull request #22831: [FLINK-32388]Add the ability to pass parameters to CUSTOM PartitionCommitPolicy

2023-06-29 Thread via GitHub


luoyuxia commented on code in PR #22831:
URL: https://github.com/apache/flink/pull/22831#discussion_r1247371849


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionCommitPolicyFactory.java:
##
@@ -63,11 +70,26 @@ public List createPolicyChain(
 successFileName, fsSupplier.get());
 case PartitionCommitPolicy.CUSTOM:
 try {
-return (PartitionCommitPolicy)
-
cl.loadClass(customClass).newInstance();
+if 
(!CollectionUtil.isNullOrEmpty(parameters)) {

Review Comment:
   You can see the it in `CI report:` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22912: [FLINK-32495][connectors/common] Fix the bug that the shared thread factory causes the source alignment unit test to fail

2023-06-29 Thread via GitHub


flinkbot commented on PR #22912:
URL: https://github.com/apache/flink/pull/22912#issuecomment-1614061350

   
   ## CI report:
   
   * c992b37ae080d255484f893e935475cabf59add5 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22911: [FLINK-32495][connectors/common] Fix the bug that the shared thread factory causes the source alignment unit test to fail

2023-06-29 Thread via GitHub


flinkbot commented on PR #22911:
URL: https://github.com/apache/flink/pull/22911#issuecomment-1614061014

   
   ## CI report:
   
   * 3c4c80935861a6c7fa77547c42e5e3edd78a703f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #22831: [FLINK-32388]Add the ability to pass parameters to CUSTOM PartitionCommitPolicy

2023-06-29 Thread via GitHub


luoyuxia commented on code in PR #22831:
URL: https://github.com/apache/flink/pull/22831#discussion_r1247371004


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionCommitPolicyFactory.java:
##
@@ -63,11 +70,26 @@ public List createPolicyChain(
 successFileName, fsSupplier.get());
 case PartitionCommitPolicy.CUSTOM:
 try {
-return (PartitionCommitPolicy)
-
cl.loadClass(customClass).newInstance();
+if 
(!CollectionUtil.isNullOrEmpty(parameters)) {

Review Comment:
   
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50618=results



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui opened a new pull request, #22912: [FLINK-32495][connectors/common] Fix the bug that the shared thread factory causes the source alignment unit test to fail

2023-06-29 Thread via GitHub


1996fanrui opened a new pull request, #22912:
URL: https://github.com/apache/flink/pull/22912

   ## What is the purpose of the change
   
   SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails.
   
   I analyzed this CI :  
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50668=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7=9089
   
   Root cause:
   
   - The CoordinatorExecutorThreadFactory cannot new multiple threads. And too 
many callers will check 
`coordinatorThreadFactory.isCurrentThreadCoordinatorThread()`, such as: 
SourceCoordinatorContext.attemptReady.
   - The CoordinatorExecutorThreadFactory is shared at 
[SourceCoordinatorTestBase](https://github.com/apache/flink/blob/21eba4ca4cb235a2189c94cdbf3abcec5cde1e6e/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java#L68)
   - It will be used at multiple source coordinator, and the second source 
coordinator will overwrite the CoordinatorExecutorThreadFactory#t, so the check 
will fail for the first source.
   
   ## Brief change log
   
   Don't share the CoordinatorExecutorThreadFactory.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #22839: [FLINK-32349][table] Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-06-29 Thread via GitHub


luoyuxia commented on code in PR #22839:
URL: https://github.com/apache/flink/pull/22839#discussion_r1246593727


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestSupportsStagingTableFactory.java:
##
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.catalog.StagedTable;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.FileUtils;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+/** A factory to create table to support staging for test purpose. */
+public class TestSupportsStagingTableFactory implements 
DynamicTableSinkFactory {
+
+public static final String IDENTIFIER = "test-staging";
+
+public static final List JOB_STATUS_CHANGE_PROCESS = new 
LinkedList<>();
+
+private static final ConfigOption DATA_DIR =
+ConfigOptions.key("data-dir")
+.stringType()
+.noDefaultValue()
+.withDescription("The data id used to write the rows.");
+
+@Override
+public DynamicTableSink createDynamicTableSink(Context context) {
+FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+helper.validate();
+String dataDir = helper.getOptions().get(DATA_DIR);
+return new SupportsStagingTableSink(dataDir);
+}
+
+@Override
+public String factoryIdentifier() {
+return IDENTIFIER;
+}
+
+@Override
+public Set> requiredOptions() {
+return Collections.singleton(DATA_DIR);
+}
+
+@Override
+public Set> optionalOptions() {
+return Collections.emptySet();
+}
+
+/** A sink that supports staging. */
+private static class SupportsStagingTableSink implements DynamicTableSink, 
SupportsStaging {
+
+private String dataDir;
+private TestStagedTable stagedTable;
+
+public SupportsStagingTableSink(String dataDir) {
+this(dataDir, null);
+}
+
+public SupportsStagingTableSink(String dataDir, TestStagedTable 
stagedTable) {
+this.dataDir = dataDir;
+this.stagedTable = stagedTable;
+}
+
+@Override
+public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+return ChangelogMode.insertOnly();
+}
+
+@Override
+public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+return new DataStreamSinkProvider() {
+@Override
+public DataStreamSink consumeDataStream(
+ProviderContext providerContext, DataStream 
dataStream) {
+if (stagedTable != null) {
+return dataStream
+.addSink(new StagedSinkFunction(dataDir))
+.setParallelism(1);
+} else {
+// otherwise, do nothing
+return dataStream.addSink(new DiscardingSink<>());
+}
+}
+};
+}
+
+@Override
+public DynamicTableSink copy() {
+ 

[GitHub] [flink] flinkbot commented on pull request #22910: [FLINK-32495][connectors/common] Fix the bug that the shared thread factory causes the source alignment unit test to fail

2023-06-29 Thread via GitHub


flinkbot commented on PR #22910:
URL: https://github.com/apache/flink/pull/22910#issuecomment-1614057264

   
   ## CI report:
   
   * ad341d68f6f4bb85bc1050cc5746a7e1524953e8 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32495) SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails

2023-06-29 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32495:
---
Labels: pull-request-available  (was: )

> SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails
> 
>
> Key: FLINK-32495
> URL: https://issues.apache.org/jira/browse/FLINK-32495
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2023-06-30-10-53-50-280.png
>
>
> SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails.
> I analyzed this CI :  
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50668=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7=9089]
> h1. Root cause:
>  * The CoordinatorExecutorThreadFactory cannot new multiple threads. And too 
> many callers will check 
> `coordinatorThreadFactory.isCurrentThreadCoordinatorThread()`, such as: 
> SourceCoordinatorContext.attemptReady.
>  * The CoordinatorExecutorThreadFactory is shared at 
> [SourceCoordinatorTestBase|https://github.com/apache/flink/blob/21eba4ca4cb235a2189c94cdbf3abcec5cde1e6e/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java#L68]
>  * It will be used at multiple source coordinator, and the second source 
> coordinator will overwrite the CoordinatorExecutorThreadFactory#t, so the 
> check will fail for the first source.
> h1. Solution:
> Don't share the CoordinatorExecutorThreadFactory.
> h1. log:
> !image-2023-06-30-10-53-50-280.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] 1996fanrui opened a new pull request, #22911: [FLINK-32495][connectors/common] Fix the bug that the shared thread factory causes the source alignment unit test to fail

2023-06-29 Thread via GitHub


1996fanrui opened a new pull request, #22911:
URL: https://github.com/apache/flink/pull/22911

   ## What is the purpose of the change
   
   SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails.
   
   I analyzed this CI :  
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50668=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7=9089
   
   Root cause:
   
   - The CoordinatorExecutorThreadFactory cannot new multiple threads. And too 
many callers will check 
`coordinatorThreadFactory.isCurrentThreadCoordinatorThread()`, such as: 
SourceCoordinatorContext.attemptReady.
   - The CoordinatorExecutorThreadFactory is shared at 
[SourceCoordinatorTestBase](https://github.com/apache/flink/blob/21eba4ca4cb235a2189c94cdbf3abcec5cde1e6e/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java#L68)
   - It will be used at multiple source coordinator, and the second source 
coordinator will overwrite the CoordinatorExecutorThreadFactory#t, so the check 
will fail for the first source.
   
   ## Brief change log
   
   Don't share the CoordinatorExecutorThreadFactory.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui opened a new pull request, #22910: [FLINK-32478][connectors/common] Fix the bug that the shared thread factory causes the source alignment unit test to fail

2023-06-29 Thread via GitHub


1996fanrui opened a new pull request, #22910:
URL: https://github.com/apache/flink/pull/22910

   ## What is the purpose of the change
   
   SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails.
   
   I analyzed this CI :  
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50668=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7=9089
   
   Root cause:
   
   - The CoordinatorExecutorThreadFactory cannot new multiple threads. And too 
many callers will check 
`coordinatorThreadFactory.isCurrentThreadCoordinatorThread()`, such as: 
SourceCoordinatorContext.attemptReady.
   - The CoordinatorExecutorThreadFactory is shared at 
[SourceCoordinatorTestBase](https://github.com/apache/flink/blob/21eba4ca4cb235a2189c94cdbf3abcec5cde1e6e/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java#L68)
   - It will be used at multiple source coordinator, and the second source 
coordinator will overwrite the CoordinatorExecutorThreadFactory#t, so the check 
will fail for the first source.
   
   ## Brief change log
   
   Don't share the CoordinatorExecutorThreadFactory.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32495) SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails

2023-06-29 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-32495:

Component/s: Connectors / Common

> SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails
> 
>
> Key: FLINK-32495
> URL: https://issues.apache.org/jira/browse/FLINK-32495
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
> Attachments: image-2023-06-30-10-53-50-280.png
>
>
> SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails.
> I analyzed this CI :  
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50668=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7=9089]
> h1. Root cause:
>  * The CoordinatorExecutorThreadFactory cannot new multiple threads. And too 
> many callers will check 
> `coordinatorThreadFactory.isCurrentThreadCoordinatorThread()`, such as: 
> SourceCoordinatorContext.attemptReady.
>  * The CoordinatorExecutorThreadFactory is shared at 
> [SourceCoordinatorTestBase|https://github.com/apache/flink/blob/21eba4ca4cb235a2189c94cdbf3abcec5cde1e6e/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java#L68]
>  * It will be used at multiple source coordinator, and the second source 
> coordinator will overwrite the CoordinatorExecutorThreadFactory#t, so the 
> check will fail for the first source.
> h1. Solution:
> Don't share the CoordinatorExecutorThreadFactory.
> h1. log:
> !image-2023-06-30-10-53-50-280.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32495) SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails

2023-06-29 Thread Rui Fan (Jira)
Rui Fan created FLINK-32495:
---

 Summary: 
SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails
 Key: FLINK-32495
 URL: https://issues.apache.org/jira/browse/FLINK-32495
 Project: Flink
  Issue Type: Bug
Reporter: Rui Fan
Assignee: Rui Fan
 Attachments: image-2023-06-30-10-53-50-280.png

SourceCoordinatorAlignmentTest.testWatermarkAlignmentWithTwoGroups fails.

I analyzed this CI :  
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50668=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7=9089]
h1. Root cause:
 * The CoordinatorExecutorThreadFactory cannot new multiple threads. And too 
many callers will check 
`coordinatorThreadFactory.isCurrentThreadCoordinatorThread()`, such as: 
SourceCoordinatorContext.attemptReady.
 * The CoordinatorExecutorThreadFactory is shared at 
[SourceCoordinatorTestBase|https://github.com/apache/flink/blob/21eba4ca4cb235a2189c94cdbf3abcec5cde1e6e/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java#L68]
 * It will be used at multiple source coordinator, and the second source 
coordinator will overwrite the CoordinatorExecutorThreadFactory#t, so the check 
will fail for the first source.

h1. Solution:

Don't share the CoordinatorExecutorThreadFactory.
h1. log:

!image-2023-06-30-10-53-50-280.png!

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-23955) submit flink sql job error when flink HA on yarn is configured

2023-06-29 Thread Tongtong Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738855#comment-17738855
 ] 

Tongtong Zhu commented on FLINK-23955:
--

If this issue still occurs in the new version, I will solve it。

> submit flink sql job error when flink HA on yarn is configured
> --
>
> Key: FLINK-23955
> URL: https://issues.apache.org/jira/browse/FLINK-23955
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.0
>Reporter: Jun Zhang
>Priority: Major
> Fix For: 1.14.7, 1.18.0
>
>
> 1.when I configured the flink HA ,like this
> {code:java}
> high-availability: zookeeper
> high-availability.storageDir: hdfs://xxx/flink/ha/
> high-availability.zookeeper.quorum: x:2181
> high-availability.zookeeper.path.root: /flink
> {code}
> 2. I start a flink session cluster
> 3. I submit a flink sql job by flink sql client and set the 
> {code:java}
> set execution.target = yarn-per-job;
> {code}
> then I get the error
>  
>  
> {code:java}
> 2021-08-25 10:40:39,500 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>  [] - Found Web Interface master3:38052 of application 
> 'application_1629858010528_0002'.
> 2021-08-25 10:40:42,447 WARN  
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An 
> exception occurred when fetching query results
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
> error.,  org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
> Flink job (2cc726b9ae70c95f128f6c1e55cf874c)
> at 
> org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:878)
> at 
> org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:892)
> at 
> org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordinator(Dispatcher.java:712)
> at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)End
>  of exception on server side>]
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
> ~[?:1.8.0_291]
> at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) 
> ~[?:1.8.0_291]
> at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:163)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:128)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> 

[jira] [Commented] (FLINK-23955) submit flink sql job error when flink HA on yarn is configured

2023-06-29 Thread Tongtong Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738854#comment-17738854
 ] 

Tongtong Zhu commented on FLINK-23955:
--

on flink 1.17.x version,Is this problem still occurring?

> submit flink sql job error when flink HA on yarn is configured
> --
>
> Key: FLINK-23955
> URL: https://issues.apache.org/jira/browse/FLINK-23955
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.0
>Reporter: Jun Zhang
>Priority: Major
> Fix For: 1.14.7, 1.18.0
>
>
> 1.when I configured the flink HA ,like this
> {code:java}
> high-availability: zookeeper
> high-availability.storageDir: hdfs://xxx/flink/ha/
> high-availability.zookeeper.quorum: x:2181
> high-availability.zookeeper.path.root: /flink
> {code}
> 2. I start a flink session cluster
> 3. I submit a flink sql job by flink sql client and set the 
> {code:java}
> set execution.target = yarn-per-job;
> {code}
> then I get the error
>  
>  
> {code:java}
> 2021-08-25 10:40:39,500 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>  [] - Found Web Interface master3:38052 of application 
> 'application_1629858010528_0002'.
> 2021-08-25 10:40:42,447 WARN  
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An 
> exception occurred when fetching query results
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
> error.,  org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
> Flink job (2cc726b9ae70c95f128f6c1e55cf874c)
> at 
> org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:878)
> at 
> org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:892)
> at 
> org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordinator(Dispatcher.java:712)
> at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)End
>  of exception on server side>]
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
> ~[?:1.8.0_291]
> at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) 
> ~[?:1.8.0_291]
> at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:163)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:128)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> 

[GitHub] [flink] iceiceiceCN commented on a diff in pull request #22831: [FLINK-32388]Add the ability to pass parameters to CUSTOM PartitionCommitPolicy

2023-06-29 Thread via GitHub


iceiceiceCN commented on code in PR #22831:
URL: https://github.com/apache/flink/pull/22831#discussion_r1247352456


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionCommitPolicyFactory.java:
##
@@ -63,11 +70,26 @@ public List createPolicyChain(
 successFileName, fsSupplier.get());
 case PartitionCommitPolicy.CUSTOM:
 try {
-return (PartitionCommitPolicy)
-
cl.loadClass(customClass).newInstance();
+if 
(!CollectionUtil.isNullOrEmpty(parameters)) {

Review Comment:
   Ok, and where can I see this ci fail? I hope to learn further : )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on pull request #22804: [FLINK-31644][network] Implement the disk tier producer for the tiered storage

2023-06-29 Thread via GitHub


TanYuxin-tyx commented on PR #22804:
URL: https://github.com/apache/flink/pull/22804#issuecomment-1614034245

   @reswqa Thanks for the review. Note that I addressed the comments with a 
fixup commit only for being convenient to review the change.
   After the review, the files(e.g., the test classes) in the fixup commit will 
be merged into the previous commits according to their production code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] iceiceiceCN commented on a diff in pull request #22831: [FLINK-32388]Add the ability to pass parameters to CUSTOM PartitionCommitPolicy

2023-06-29 Thread via GitHub


iceiceiceCN commented on code in PR #22831:
URL: https://github.com/apache/flink/pull/22831#discussion_r1247350845


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSinkTest.java:
##
@@ -149,4 +163,54 @@ private static String buildSinkTableSql(
 private static String buildInsertIntoSql(String sinkTable, String 
sourceTable) {
 return String.format("INSERT INTO %s SELECT * FROM %s", sinkTable, 
sourceTable);
 }
+
+@Test
+public void testFileSystemTableSinkWithCustomCommitPolicy() throws 
Exception {

Review Comment:
   More concise! Thanks for your suggesion.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22804: [FLINK-31644][network] Implement the disk tier producer for the tiered storage

2023-06-29 Thread via GitHub


TanYuxin-tyx commented on code in PR #22804:
URL: https://github.com/apache/flink/pull/22804#discussion_r1247346817


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/SubpartitionDiskCacheManager.java:
##
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link SubpartitionDiskCacheManager} is responsible to manage the 
cached buffers in a single
+ * subpartition.
+ */
+class SubpartitionDiskCacheManager {
+
+/**
+ * All the buffers. The first field of the tuple is the buffer, while the 
second field of the
+ * buffer is the buffer index.
+ *
+ * Note that this field can be accessed by the task thread or the write 
IO thread, so the
+ * thread safety should be ensured.
+ */
+private final Deque> allBuffers = new 
LinkedList<>();
+
+/**
+ * Record the buffer index in the {@link SubpartitionDiskCacheManager}. 
Each time a new buffer
+ * is added to the {@code allBuffers}, this field is increased by one.
+ */
+private int bufferIndex;
+
+/**
+ * Record the segment id that is writing to. Each time when the segment is 
finished, this filed
+ * is increased by one.
+ */
+private int segmentIndex;

Review Comment:
   Guarded it by `allBuffers`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22804: [FLINK-31644][network] Implement the disk tier producer for the tiered storage

2023-06-29 Thread via GitHub


TanYuxin-tyx commented on code in PR #22804:
URL: https://github.com/apache/flink/pull/22804#discussion_r1247346676


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/SubpartitionDiskCacheManager.java:
##
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link SubpartitionDiskCacheManager} is responsible to manage the 
cached buffers in a single
+ * subpartition.
+ */
+class SubpartitionDiskCacheManager {
+
+/**
+ * All the buffers. The first field of the tuple is the buffer, while the 
second field of the
+ * buffer is the buffer index.
+ *
+ * Note that this field can be accessed by the task thread or the write 
IO thread, so the
+ * thread safety should be ensured.
+ */
+private final Deque> allBuffers = new 
LinkedList<>();
+
+/**
+ * Record the buffer index in the {@link SubpartitionDiskCacheManager}. 
Each time a new buffer
+ * is added to the {@code allBuffers}, this field is increased by one.
+ */
+private int bufferIndex;

Review Comment:
   Fixed and added the doc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22804: [FLINK-31644][network] Implement the disk tier producer for the tiered storage

2023-06-29 Thread via GitHub


TanYuxin-tyx commented on code in PR #22804:
URL: https://github.com/apache/flink/pull/22804#discussion_r1247346519


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java:
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The {@link DiskCacheManager} is responsible for managing cached buffers 
before flushing to files.
+ */
+class DiskCacheManager {
+
+private final TieredStoragePartitionId partitionId;
+
+private final int numSubpartitions;
+
+private final PartitionFileWriter partitionFileWriter;
+
+private final SubpartitionDiskCacheManager[] subpartitionCacheManagers;
+
+/** Whether the current flush process has completed. */
+private CompletableFuture hasFlushCompleted;
+
+DiskCacheManager(
+TieredStoragePartitionId partitionId,
+int numSubpartitions,
+TieredStorageMemoryManager storageMemoryManager,
+PartitionFileWriter partitionFileWriter) {
+this.partitionId = partitionId;
+this.numSubpartitions = numSubpartitions;
+this.partitionFileWriter = partitionFileWriter;
+this.subpartitionCacheManagers = new 
SubpartitionDiskCacheManager[numSubpartitions];
+this.hasFlushCompleted = FutureUtils.completedVoidFuture();
+
+for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
++subpartitionId) {
+subpartitionCacheManagers[subpartitionId] = new 
SubpartitionDiskCacheManager();
+}
+
storageMemoryManager.listenBufferReclaimRequest(this::notifyFlushCachedBuffers);
+}
+
+// 
+//  Called by DiskTierProducerAgent
+// 
+
+/**
+ * Append buffer to {@link DiskCacheManager}.
+ *
+ * @param buffer to be managed by this class.
+ * @param subpartitionId the subpartition of this record.
+ */
+void append(Buffer buffer, int subpartitionId) {
+subpartitionCacheManagers[subpartitionId].append(buffer);
+}
+
+/**
+ * Append the end-of-segment event to {@link DiskCacheManager}, which 
indicates the segment has
+ * finished.
+ *
+ * @param record the end-of-segment event
+ * @param subpartitionId target subpartition of this record.
+ * @param dataType the type of this record. In other words, is it data or 
event.
+ */
+void appendEndOfSegmentEvent(ByteBuffer record, int subpartitionId, 
Buffer.DataType dataType) {

Review Comment:
   This should not be removed, because the previous comments show that the 
concept of end-of-segment need not be aware both in 
`SubpartitionDiskCacheManager` and `DiskProducerAgent`. I also think this makes 
sense. 
   
   So the `Buffer.DataType` is to avoid composing the end-of-segment to 
`SubpartitionDiskCacheManager`.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/SubpartitionDiskCacheManager.java:
##
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); 

[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22804: [FLINK-31644][network] Implement the disk tier producer for the tiered storage

2023-06-29 Thread via GitHub


TanYuxin-tyx commented on code in PR #22804:
URL: https://github.com/apache/flink/pull/22804#discussion_r1247344923


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java:
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The {@link DiskCacheManager} is responsible for managing cached buffers 
before flushing to files.
+ */
+class DiskCacheManager {
+
+private final TieredStoragePartitionId partitionId;
+
+private final int numSubpartitions;
+
+private final PartitionFileWriter partitionFileWriter;
+
+private final SubpartitionDiskCacheManager[] subpartitionCacheManagers;
+
+/** Whether the current flush process has completed. */
+private CompletableFuture hasFlushCompleted;
+
+DiskCacheManager(
+TieredStoragePartitionId partitionId,
+int numSubpartitions,
+TieredStorageMemoryManager storageMemoryManager,
+PartitionFileWriter partitionFileWriter) {
+this.partitionId = partitionId;
+this.numSubpartitions = numSubpartitions;
+this.partitionFileWriter = partitionFileWriter;
+this.subpartitionCacheManagers = new 
SubpartitionDiskCacheManager[numSubpartitions];
+this.hasFlushCompleted = FutureUtils.completedVoidFuture();
+
+for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
++subpartitionId) {
+subpartitionCacheManagers[subpartitionId] = new 
SubpartitionDiskCacheManager();
+}
+
storageMemoryManager.listenBufferReclaimRequest(this::notifyFlushCachedBuffers);
+}
+
+// 
+//  Called by DiskTierProducerAgent
+// 
+
+/**
+ * Append buffer to {@link DiskCacheManager}.
+ *
+ * @param buffer to be managed by this class.
+ * @param subpartitionId the subpartition of this record.
+ */
+void append(Buffer buffer, int subpartitionId) {
+subpartitionCacheManagers[subpartitionId].append(buffer);
+}
+
+/**
+ * Append the end-of-segment event to {@link DiskCacheManager}, which 
indicates the segment has
+ * finished.
+ *
+ * @param record the end-of-segment event
+ * @param subpartitionId target subpartition of this record.
+ * @param dataType the type of this record. In other words, is it data or 
event.
+ */
+void appendEndOfSegmentEvent(ByteBuffer record, int subpartitionId, 
Buffer.DataType dataType) {
+
subpartitionCacheManagers[subpartitionId].appendEndOfSegmentEvent(record, 
dataType);
+
+// When finishing a segment, the buffers should be flushed because the 
next segment may be
+// written to another tier. If the buffers in this tier are not 
flushed here, then the next
+// segment in another tier may be stuck by lacking buffers. This flush 
has a low trigger
+// frequency, so its impact on performance is relatively small.
+forceFlushCachedBuffers();
+}
+
+/**
+ * Return the current buffer index.
+ *
+ * @param subpartitionId the target subpartition id
+ * @return the finished buffer index
+ */
+int getBufferIndex(int subpartitionId) {
+return subpartitionCacheManagers[subpartitionId].getBufferIndex();
+}
+
+/** Close this {@link 

[jira] [Commented] (FLINK-32348) MongoDB tests are flaky and time out

2023-06-29 Thread Jiabao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738852#comment-17738852
 ] 

Jiabao Sun commented on FLINK-32348:


Hi [~martijnvisser]

I tried to investigate and reproduce the issue, and found that when the 
`AsyncCheckpointRunnable` meets `CancellationException`, the task never stops 
as expected.

I think this problem may relate to 
[FLINK-25902|https://issues.apache.org/jira/browse/FLINK-25902].
The root cause of this problem remains to be further investigated.


{code:sh}
00:30:26,533 [flink-akka.actor.default-dispatcher-10] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
Checkpoint 4 for job 79765d8c304b804a1adbd3677bc39708 failed due to 
org.apache.flink.runtime.checkpoint.CheckpointException: TaskManager received a 
checkpoint request for unknown task 
6585a08f46e2d380ebe0ac7fde3739a7_cbc357ccb763df2852fee8c4fc7d55f2_1_1. Failure 
reason: Task local checkpoint failure.
00:30:26,533 [Checkpoint Timer] WARN  
org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to 
trigger or complete checkpoint 4 for job 79765d8c304b804a1adbd3677bc39708. (0 
consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: TaskManager received a 
checkpoint request for unknown task 
6585a08f46e2d380ebe0ac7fde3739a7_cbc357ccb763df2852fee8c4fc7d55f2_1_1. Failure 
reason: Task local checkpoint failure.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:1025)
 ~[flink-runtime-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source) ~[?:?]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_372]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
 ~[?:?]
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 ~[?:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
 ~[?:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
 ~[?:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
 ~[?:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) ~[?:?]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) ~[?:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
~[?:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
~[?:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
~[?:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
~[?:?]
at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[?:?]
at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[?:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 
~[?:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) ~[?:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:547) ~[?:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[?:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[?:?]
at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[?:?]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
~[?:1.8.0_372]
at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
~[?:1.8.0_372]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
~[?:1.8.0_372]
at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) 
~[?:1.8.0_372]
00:30:26,554 [AsyncOperations-thread-1] INFO  
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Sink: 
Data stream collect sink (1/1)#1 - asynchronous part of checkpoint 4 could not 
be completed.
java.util.concurrent.CancellationException: null
at java.util.concurrent.FutureTask.report(FutureTask.java:121) 
~[?:1.8.0_372]
at java.util.concurrent.FutureTask.get(FutureTask.java:192) 
~[?:1.8.0_372]
at 
org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544)
 ~[flink-core-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:60)
 ~[flink-streaming-java-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at 

[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22804: [FLINK-31644][network] Implement the disk tier producer for the tiered storage

2023-06-29 Thread via GitHub


TanYuxin-tyx commented on code in PR #22804:
URL: https://github.com/apache/flink/pull/22804#discussion_r1247344394


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java:
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The {@link DiskCacheManager} is responsible for managing cached buffers 
before flushing to files.
+ */
+class DiskCacheManager {
+
+private final TieredStoragePartitionId partitionId;
+
+private final int numSubpartitions;
+
+private final PartitionFileWriter partitionFileWriter;
+
+private final SubpartitionDiskCacheManager[] subpartitionCacheManagers;
+
+/** Whether the current flush process has completed. */
+private CompletableFuture hasFlushCompleted;

Review Comment:
   Renamed and added docs to describe this field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22804: [FLINK-31644][network] Implement the disk tier producer for the tiered storage

2023-06-29 Thread via GitHub


TanYuxin-tyx commented on code in PR #22804:
URL: https://github.com/apache/flink/pull/22804#discussion_r1247344170


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java:
##
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The {@link ProducerMergedPartitionFileIndex} is used by {@link 
ProducerMergedPartitionFileWriter}
+ * and {@link ProducerMergedPartitionFileReader}, to maintain the offset of 
each buffer in the
+ * physical file.
+ *
+ * For efficiency, buffers from the same subpartition that are both 
logically (i.e. index in the
+ * subpartition) and physically (i.e. offset in the file) consecutive are 
combined into a {@link
+ * Region}.
+ *
+ * For example, the following buffers (indicated by 
subpartitionId-bufferIndex):
+ *   1-1, 1-2, 1-3, 2-1, 2-2, 2-5, 1-4, 1-5, 2-6
+ * will be combined into 5 regions (separated by '|'):
+ *   1-1, 1-2, 1-3 | 2-1, 2-2 | 2-5 | 1-4, 1-5 | 2-6
+ * 
+ */
+public class ProducerMergedPartitionFileIndex {
+
+/**
+ * The regions belonging to each subpartitions.
+ *
+ * Note that the field can be accessed by the writing and reading IO 
thread, so the lock is
+ * to ensure the thread safety.
+ */
+@GuardedBy("lock")
+private final List> subpartitionRegions;
+
+@GuardedBy("lock")
+private boolean isReleased;
+
+private final Object lock = new Object();
+
+public ProducerMergedPartitionFileIndex(int numSubpartitions) {
+this.subpartitionRegions = new ArrayList<>();
+for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
++subpartitionId) {
+subpartitionRegions.add(new TreeMap<>());
+}
+}
+
+/**
+ * Add buffers to the index.
+ *
+ * @param buffers to be added. Note, the provided buffers are required to 
be physically
+ * consecutive and in the same order as in the file.
+ */
+void addBuffers(List buffers) {
+if (buffers.isEmpty()) {
+return;
+}
+
+Map> convertedRegions = 
convertToRegions(buffers);
+synchronized (lock) {
+convertedRegions.forEach(
+(subpartition, regions) -> {
+Map regionMap = 
subpartitionRegions.get(subpartition);
+for (Region region : regions) {
+regionMap.put(region.getFirstBufferIndex(), 
region);
+}
+});
+}
+}
+
+/**
+ * Get the subpartition's {@link Region} containing the specific buffer 
index.
+ *
+ * @param subpartitionId the subpartition id
+ * @param bufferIndex the buffer index
+ * @return the region containing the buffer index, or return emtpy if the 
region is not found.
+ */
+Optional getRegion(TieredStorageSubpartitionId subpartitionId, int 
bufferIndex) {
+synchronized (lock) {
+if (isReleased) {
+return Optional.empty();
+}
+Map.Entry regionEntry =
+subpartitionRegions
+.get(subpartitionId.getSubpartitionId())
+.floorEntry(bufferIndex);
+if (regionEntry == null) {
+return Optional.empty();
+}
+Region region = regionEntry.getValue();
+return bufferIndex < region.getFirstBufferIndex() + 
region.numBuffers
+? Optional.of(region)
+: Optional.empty();
+}
+}
+
+void release() {
+synchronized (lock) {
+subpartitionRegions.clear();
+

[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22804: [FLINK-31644][network] Implement the disk tier producer for the tiered storage

2023-06-29 Thread via GitHub


TanYuxin-tyx commented on code in PR #22804:
URL: https://github.com/apache/flink/pull/22804#discussion_r1247343762


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java:
##
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The {@link ProducerMergedPartitionFileIndex} is used by {@link 
ProducerMergedPartitionFileWriter}
+ * and {@link ProducerMergedPartitionFileReader}, to maintain the offset of 
each buffer in the
+ * physical file.
+ *
+ * For efficiency, buffers from the same subpartition that are both 
logically (i.e. index in the
+ * subpartition) and physically (i.e. offset in the file) consecutive are 
combined into a {@link
+ * Region}.
+ *
+ * For example, the following buffers (indicated by 
subpartitionId-bufferIndex):
+ *   1-1, 1-2, 1-3, 2-1, 2-2, 2-5, 1-4, 1-5, 2-6
+ * will be combined into 5 regions (separated by '|'):
+ *   1-1, 1-2, 1-3 | 2-1, 2-2 | 2-5 | 1-4, 1-5 | 2-6
+ * 
+ */
+public class ProducerMergedPartitionFileIndex {
+
+/**
+ * The regions belonging to each subpartitions.
+ *
+ * Note that the field can be accessed by the writing and reading IO 
thread, so the lock is
+ * to ensure the thread safety.
+ */
+@GuardedBy("lock")
+private final List> subpartitionRegions;
+
+@GuardedBy("lock")
+private boolean isReleased;
+
+private final Object lock = new Object();
+
+public ProducerMergedPartitionFileIndex(int numSubpartitions) {
+this.subpartitionRegions = new ArrayList<>();
+for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
++subpartitionId) {
+subpartitionRegions.add(new TreeMap<>());
+}
+}
+
+/**
+ * Add buffers to the index.
+ *
+ * @param buffers to be added. Note, the provided buffers are required to 
be physically
+ * consecutive and in the same order as in the file.
+ */
+void addBuffers(List buffers) {
+if (buffers.isEmpty()) {
+return;
+}
+
+Map> convertedRegions = 
convertToRegions(buffers);
+synchronized (lock) {
+convertedRegions.forEach(
+(subpartition, regions) -> {
+Map regionMap = 
subpartitionRegions.get(subpartition);
+for (Region region : regions) {
+regionMap.put(region.getFirstBufferIndex(), 
region);
+}
+});
+}
+}
+
+/**
+ * Get the subpartition's {@link Region} containing the specific buffer 
index.
+ *
+ * @param subpartitionId the subpartition id
+ * @param bufferIndex the buffer index
+ * @return the region containing the buffer index, or return emtpy if the 
region is not found.
+ */
+Optional getRegion(TieredStorageSubpartitionId subpartitionId, int 
bufferIndex) {
+synchronized (lock) {
+if (isReleased) {
+return Optional.empty();
+}
+Map.Entry regionEntry =
+subpartitionRegions
+.get(subpartitionId.getSubpartitionId())
+.floorEntry(bufferIndex);
+if (regionEntry == null) {
+return Optional.empty();
+}
+Region region = regionEntry.getValue();
+return bufferIndex < region.getFirstBufferIndex() + 
region.numBuffers
+? Optional.of(region)
+: Optional.empty();
+}
+}
+
+void release() {
+synchronized (lock) {
+subpartitionRegions.clear();
+

[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22804: [FLINK-31644][network] Implement the disk tier producer for the tiered storage

2023-06-29 Thread via GitHub


TanYuxin-tyx commented on code in PR #22804:
URL: https://github.com/apache/flink/pull/22804#discussion_r1247343634


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileWriter.java:
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The {@link PartitionFileWriter} interface defines the write logic for 
different types of shuffle
+ * files.
+ */
+public interface PartitionFileWriter {
+
+/**
+ * Write the buffers to the partition file. The written buffers may belong 
to multiple
+ * subpartitions, but these buffers will be consecutive in the file.
+ *
+ * @param partitionId the partition id
+ * @param buffersToWrite the buffers to be written to the partition file
+ * @return the completable future indicating whether the writing file 
process has finished. If
+ * the {@link CompletableFuture} is completed, the written process is 
completed.
+ */
+CompletableFuture write(
+TieredStoragePartitionId partitionId, 
List buffersToWrite);
+
+/** Release all the resources of the {@link PartitionFileWriter}. */
+void release();
+
+/**
+ * The {@link SubpartitionBufferContext} contains all the buffers 
belonging to one subpartition.
+ */
+class SubpartitionBufferContext {
+
+/** The subpartition id. */
+private final int subpartitionId;
+
+/** The {@link SegmentBufferContext}s belonging to the subpartition. */
+private final List segmentBufferContexts;
+
+public SubpartitionBufferContext(
+int subpartitionId, List 
segmentBufferContexts) {
+this.subpartitionId = subpartitionId;
+this.segmentBufferContexts = segmentBufferContexts;
+}
+
+public int getSubpartitionId() {
+return subpartitionId;
+}
+
+public List getSegmentBufferContexts() {
+return segmentBufferContexts;
+}
+}
+
+/**
+ * The {@link SegmentBufferContext} contains all the buffers belonging to 
the segment. Note that
+ * when this indicates whether the segment is finished, the field {@code 
bufferWithIndexes}
+ * should be empty.
+ */
+class SegmentBufferContext {
+
+/** The segment id. */
+private final int segmentId;
+
+/** All the buffers belonging to the segment. */
+private final List> bufferAndIndexes;
+
+/** Whether it is necessary to finish the segment. */
+private final boolean segmentFinished;
+
+public SegmentBufferContext(
+int segmentId,
+List> bufferAndIndexes,
+boolean segmentFinished) {
+this.segmentId = segmentId;
+this.bufferAndIndexes = bufferAndIndexes;
+this.segmentFinished = segmentFinished;
+}
+
+public int getSegmentId() {
+return segmentId;
+}
+
+public List> getBufferAndIndexes() {
+return bufferAndIndexes;
+}
+
+public boolean isSegmentFinished() {

Review Comment:
   This method is designed to be used in the hash mode partition file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32281) Enable two-phase HashAgg default when agg function support adaptive local HashAgg

2023-06-29 Thread dalongliu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dalongliu updated FLINK-32281:
--
Description: For the HashAgg operator, planner currently prefers a 
one-phase agg when the statistic cannot be accurately estimated. In some 
queries of production scenarios, it may be more reasonable to choose a 
two-phase agg. In the TPC-DS cases, we find that for some patterns actually 
choosing two-stage agg, the query runtime is significantly reduced. In 
https://issues.apache.org/jira/browse/FLINK-30542 , we have introduced the 
adaptive local hashagg, which can adaptively skip aggregation when the local 
phase aggregation degree is relatively low, which can greatly improve the 
performance of two-phase aggregation in some queries. Based on the above 
background, in this issue, we propose to turn on two-phase agg by default for 
functions that support adaptive local hashagg, such as sum/count/min/max, etc., 
so as to exploit the ability of adpative local hashgg to improve the 
performance of agg query. For OFCG, if we turn on two-phaseagg by default, we 
can also let the local agg operator be put into the fused operator, so as to 
enjoy the benefit from OFCG.

> Enable two-phase HashAgg default when agg function support adaptive local 
> HashAgg
> -
>
> Key: FLINK-32281
> URL: https://issues.apache.org/jira/browse/FLINK-32281
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: dalongliu
>Priority: Major
>
> For the HashAgg operator, planner currently prefers a one-phase agg when the 
> statistic cannot be accurately estimated. In some queries of production 
> scenarios, it may be more reasonable to choose a two-phase agg. In the TPC-DS 
> cases, we find that for some patterns actually choosing two-stage agg, the 
> query runtime is significantly reduced. In 
> https://issues.apache.org/jira/browse/FLINK-30542 , we have introduced the 
> adaptive local hashagg, which can adaptively skip aggregation when the local 
> phase aggregation degree is relatively low, which can greatly improve the 
> performance of two-phase aggregation in some queries. Based on the above 
> background, in this issue, we propose to turn on two-phase agg by default for 
> functions that support adaptive local hashagg, such as sum/count/min/max, 
> etc., so as to exploit the ability of adpative local hashgg to improve the 
> performance of agg query. For OFCG, if we turn on two-phaseagg by default, we 
> can also let the local agg operator be put into the fused operator, so as to 
> enjoy the benefit from OFCG.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] hanyuzheng7 commented on pull request #22730: [FLINK-32257]-table-Add-ARRAY_MAX-function

2023-06-29 Thread via GitHub


hanyuzheng7 commented on PR #22730:
URL: https://github.com/apache/flink/pull/22730#issuecomment-1614023855

   @liuyongvs ok got it. I will fix it.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2023-06-29 Thread loyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738851#comment-17738851
 ] 

loyi commented on FLINK-23190:
--

[~heigebupahei]  I implemented it in flink1.13.2,  not sure whether it is 
related to version changes. I will try to find out the reason,not guaranteed. 

 

 

 

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32494) Cannot convert list literal to Table with PyFlink

2023-06-29 Thread Yunfeng Zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738841#comment-17738841
 ] 

Yunfeng Zhou commented on FLINK-32494:
--

Hi [~hxbks2ks], could you please take a look at this ticket?

> Cannot convert list literal to Table with PyFlink
> -
>
> Key: FLINK-32494
> URL: https://issues.apache.org/jira/browse/FLINK-32494
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.1
>Reporter: Yunfeng Zhou
>Priority: Major
>
> During my attempt to convert a list or array to a PyFlink Table using the 
> following program
> {code:python}
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.java_gateway import get_gateway
> from pyflink.table import (
> expressions as native_flink_expr,
> StreamTableEnvironment,
> )
> from pyflink.table.types import DataTypes
> if __name__ == "__main__":
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env)
> table = t_env.from_elements([(1, ), (2, ), (3, )])
> # table = table.add_or_replace_columns(
> # native_flink_expr.lit([], 
> DataTypes.ARRAY(DataTypes.INT()).not_null())
> # )
> table = table.add_or_replace_columns(
> 
> native_flink_expr.lit(get_gateway().new_array(get_gateway().jvm.java.lang.Integer,
>  0))
> )
> table.execute().print()
> {code}
> The following exception would be thrown
> {code}
> ClassCastException: [Ljava.lang.Integer; cannot be cast to java.util.List
> {code}
> If I use the following code to create the literal expression along with the 
> program above
> {code:python}
> table = table.add_or_replace_columns(
> native_flink_expr.lit([], DataTypes.ARRAY(DataTypes.INT()).not_null())
> )
> {code}
> The following exception would be thrown
> {code}
> Data type 'ARRAY NOT NULL' with conversion class '[Ljava.lang.Integer;' 
> does not support a value literal of class 'java.util.ArrayList'.
> {code}
> As PyFlink does not provide a document explaining how to create Table with 
> list literals, and my attempts described above both fail, there might be some 
> bug in PyFlink with this function.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32494) Cannot convert list literal to Table with PyFlink

2023-06-29 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-32494:


 Summary: Cannot convert list literal to Table with PyFlink
 Key: FLINK-32494
 URL: https://issues.apache.org/jira/browse/FLINK-32494
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.16.1
Reporter: Yunfeng Zhou


During my attempt to convert a list or array to a PyFlink Table using the 
following program

{code:python}
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.java_gateway import get_gateway
from pyflink.table import (
expressions as native_flink_expr,
StreamTableEnvironment,
)
from pyflink.table.types import DataTypes

if __name__ == "__main__":
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
table = t_env.from_elements([(1, ), (2, ), (3, )])
# table = table.add_or_replace_columns(
# native_flink_expr.lit([], DataTypes.ARRAY(DataTypes.INT()).not_null())
# )
table = table.add_or_replace_columns(

native_flink_expr.lit(get_gateway().new_array(get_gateway().jvm.java.lang.Integer,
 0))
)
table.execute().print()
{code}

The following exception would be thrown
{code}
ClassCastException: [Ljava.lang.Integer; cannot be cast to java.util.List
{code}

If I use the following code to create the literal expression along with the 
program above
{code:python}
table = table.add_or_replace_columns(
native_flink_expr.lit([], DataTypes.ARRAY(DataTypes.INT()).not_null())
)
{code}

The following exception would be thrown
{code}
Data type 'ARRAY NOT NULL' with conversion class '[Ljava.lang.Integer;' 
does not support a value literal of class 'java.util.ArrayList'.
{code}

As PyFlink does not provide a document explaining how to create Table with list 
literals, and my attempts described above both fail, there might be some bug in 
PyFlink with this function.






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] liuyongvs commented on pull request #22730: [FLINK-32257]-table-Add-ARRAY_MAX-function

2023-06-29 Thread via GitHub


liuyongvs commented on PR #22730:
URL: https://github.com/apache/flink/pull/22730#issuecomment-1614000774

   @dawidwys @hanyuzheng7 @snuyanzin git msg should like 
   [FLINK-][table] Add built-in xxx function.
   
   there is a sample 
https://github.com/apache/flink/commit/79b06cf95389491d6a800ccebdcc8feb2b6f9025


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32493) Introduce RuntimeFilterBuilderOperator to build a BloomFilter from build side data of shuffle join

2023-06-29 Thread dalongliu (Jira)
dalongliu created FLINK-32493:
-

 Summary: Introduce RuntimeFilterBuilderOperator to build a 
BloomFilter from build side data of shuffle join
 Key: FLINK-32493
 URL: https://issues.apache.org/jira/browse/FLINK-32493
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Affects Versions: 1.18.0
Reporter: dalongliu
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32492) Introduce FlinkRuntimeFilterProgram to inject runtime filter

2023-06-29 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-32492:
--

 Summary: Introduce FlinkRuntimeFilterProgram to inject runtime 
filter
 Key: FLINK-32492
 URL: https://issues.apache.org/jira/browse/FLINK-32492
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Lijie Wang
Assignee: Lijie Wang
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-web] devlhl commented on pull request #660: [hotfix] Fix flinkVersionNew >= 1.15

2023-06-29 Thread via GitHub


devlhl commented on PR #660:
URL: https://github.com/apache/flink-web/pull/660#issuecomment-1613995036

   ext {
   // because the version 1.1.5+ without scalaBinaryVersion
   flinkVersionNew = flinkVersion.toString().replace("-SNAPSHOT", "") >= 
"1.15" 
   }
   if (flinkVersionNew) {
   mainClassName = '${organization}.DataStreamJob'
   } else {
   mainClassName = '${organization}.StreamingJob'
   }
   dependencies {
   if (flinkVersionNew) {
   implementation 
"org.apache.flink:flink-streaming-java:\${flinkVersion}"
   implementation "org.apache.flink:flink-clients:\${flinkVersion}"
   } else {
   implementation 
"org.apache.flink:flink-streaming-java_\${scalaBinaryVersion}:\${flinkVersion}"
   implementation 
"org.apache.flink:flink-clients_\${scalaBinaryVersion}:\${flinkVersion}"
   }
   }


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-32491) Introduce RuntimeFilterOperator to support runtime filter which can reduce the shuffle data size before shuffle join

2023-06-29 Thread Lijie Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lijie Wang reassigned FLINK-32491:
--

Assignee: dalongliu

> Introduce RuntimeFilterOperator to support runtime filter which can reduce 
> the shuffle data size before shuffle join
> 
>
> Key: FLINK-32491
> URL: https://issues.apache.org/jira/browse/FLINK-32491
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32489) Support serialize and merge BloomFilter for runtime filter

2023-06-29 Thread Lijie Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lijie Wang reassigned FLINK-32489:
--

Assignee: dalongliu

> Support serialize and merge BloomFilter for runtime filter
> --
>
> Key: FLINK-32489
> URL: https://issues.apache.org/jira/browse/FLINK-32489
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Affects Versions: 1.18.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Runtime filter needs to use BloomFIlter, it would be transferred from runtime 
> filter builder operator to runtime filter operator via network, so 
> serialization and merge need.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] hanyuzheng7 commented on pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-29 Thread via GitHub


hanyuzheng7 commented on PR #22842:
URL: https://github.com/apache/flink/pull/22842#issuecomment-1613986096

   @snuyanzin can you help me do another code review. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on pull request #22805: [FLINK-32365][orc]get orc table statistics in parallel

2023-06-29 Thread via GitHub


luoyuxia commented on PR #22805:
URL: https://github.com/apache/flink/pull/22805#issuecomment-1613981013

   @Baibaiwuguo The test fails.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] lindong28 merged pull request #659: Release Flink ML 2.3.0

2023-06-29 Thread via GitHub


lindong28 merged PR #659:
URL: https://github.com/apache/flink-web/pull/659


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] lindong28 commented on pull request #659: Release Flink ML 2.3.0

2023-06-29 Thread via GitHub


lindong28 commented on PR #659:
URL: https://github.com/apache/flink-web/pull/659#issuecomment-1613949835

   Thanks @zhipeng93 for the review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

2023-06-29 Thread via GitHub


ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1613909720

   Hi @sergey,
   
   Unfortunately, I will not be able to work on this due to some personal work.
   
   Better to reassign this task
   
   Regards,
   Ashmeet.
   
   
   On Fri, Jun 16, 2023, 04:24 ashmeet kandhari ***@***.***>
   wrote:
   
   > Hi @Sergey,
   >
   > I am on vacation and will be back in July.
   >
   > I can continue after that.
   >
   > Thanks
   > Ashmeet
   >
   > On Tue, Jun 6, 2023, 13:11 Sergey Nuyanzin ***@***.***>
   > wrote:
   >
   >> Hi @ashmeet-kandhari  I wonder
   >> whether you going to continue working on this PR or not?
   >>
   >> —
   >> Reply to this email directly, view it on GitHub
   >> , or
   >> unsubscribe
   >> 

   >> .
   >> You are receiving this because you were mentioned.Message ID:
   >> ***@***.***>
   >>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on pull request #22834: [FLINK-32260]-table-Add-ARRAY_SLICE-function

2023-06-29 Thread via GitHub


hanyuzheng7 commented on PR #22834:
URL: https://github.com/apache/flink/pull/22834#issuecomment-1613802619

   @snuyanzin do you think the description is ok, I think it's a bit long and 
it's not easy to explain clearly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on pull request #22909: [Flink 32257] table add array max function

2023-06-29 Thread via GitHub


hanyuzheng7 commented on PR #22909:
URL: https://github.com/apache/flink/pull/22909#issuecomment-1613789458

   @dawidwys This is array_max new PR. Fixed ArrayElementOutputTypeStrategyTest 
and ready to merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on pull request #22909: [Flink 32257] table add array max function

2023-06-29 Thread via GitHub


hanyuzheng7 commented on PR #22909:
URL: https://github.com/apache/flink/pull/22909#issuecomment-1613785818

   
   https://github.com/apache/flink/assets/135176127/0c3dbc34-0cba-4c31-a9e7-e7cb5b01dd1f;>
   
   https://github.com/apache/flink/assets/135176127/97c5bd81-5da1-4ec1-8dbe-196b3188ae4b;>
   
   https://github.com/apache/flink/assets/135176127/882227d2-caf0-4ae5-91a7-515fe7efecb1;>
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32478) SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails

2023-06-29 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738764#comment-17738764
 ] 

Sergey Nuyanzin commented on FLINK-32478:
-

yep, there is another case for 1.17: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50668=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7=9089

> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails
> --
>
> Key: FLINK-32478
> URL: https://issues.apache.org/jira/browse/FLINK-32478
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.18.0, 1.16.3, 1.17.2
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails
>  
> Root cause: multiple sources share the same thread pool, and the second 
> source cannot start due to the first source closes the shared thread pool.
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50611=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8613



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22909: [Flink 32257] table add array max function

2023-06-29 Thread via GitHub


flinkbot commented on PR #22909:
URL: https://github.com/apache/flink/pull/22909#issuecomment-1613625573

   
   ## CI report:
   
   * 650fac3d7ed0e0926c00cb3888fe0136c81192bf UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31800) Update tika to current

2023-06-29 Thread Morey Straus (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738730#comment-17738730
 ] 

Morey Straus commented on FLINK-31800:
--

[~martijnvisser] I'm being told its in opt/flink/bin/bash-java-utils.jar - 
would you mind checking?

> Update tika to current
> --
>
> Key: FLINK-31800
> URL: https://issues.apache.org/jira/browse/FLINK-31800
> Project: Flink
>  Issue Type: Technical Debt
>Affects Versions: 1.17.0
>Reporter: Morey Straus
>Priority: Major
>  Labels: security
>
> Multiple vulns in org.apache.tika:tika-core-1.28.1
> https://nvd.nist.gov/vuln/detail/CVE-2022-30126
> https://nvd.nist.gov/vuln/detail/CVE-2022-30973
> https://nvd.nist.gov/vuln/detail/CVE-2022-30126
> https://nvd.nist.gov/vuln/detail/CVE-2022-25169



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] hanyuzheng7 opened a new pull request, #22909: Flink 32257] table add array max function

2023-06-29 Thread via GitHub


hanyuzheng7 opened a new pull request, #22909:
URL: https://github.com/apache/flink/pull/22909

   ## What is the purpose of the change
   This is an implementation of ARRAY_MAX
   
   
   ## Brief change log
   The array_max() function get the maximum element from input array.
   The result matches the type of the elements. NULL elements are skipped. If 
array is empty, or contains only NULL elements, NULL is returned.
   
   Arguments
   
   array: Any ARRAY with elements for which order is supported.
   
   Syntax
   array_max(array)
   
   Returns
   The result matches the type of the elements. NULL elements are skipped. If 
array is empty, or contains only NULL elements, NULL is returned.
   
   Examples
   
   SQL
   
   ```
   > SELECT array_max(array(1, 20, NULL, 3)); 20
   
   // Fink SQL-> select array_max(array[1, 20, null, 3]) 20
   ```
   
   See also
   spark https://spark.apache.org/docs/latest/api/sql/index.html#array_max
   
   presto https://prestodb.io/docs/current/functions/array.html
   
   ## Verifying this change
   This change added tests in CollectionFunctionsITCase.
   
   ## Does this pull request potentially affect one of the following parts:  
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on pull request #22730: [FLINK-32257]-table-Add-ARRAY_MAX-function

2023-06-29 Thread via GitHub


hanyuzheng7 commented on PR #22730:
URL: https://github.com/apache/flink/pull/22730#issuecomment-1613589981

   @dawidwys ok got it. Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dawidwys commented on pull request #22730: [FLINK-32257]-table-Add-ARRAY_MAX-function

2023-06-29 Thread via GitHub


dawidwys commented on PR #22730:
URL: https://github.com/apache/flink/pull/22730#issuecomment-1613587958

   @hanyuzheng7 I think the issue is with bridging classes...
   
   tl;dr; this should do the trick:
   
   ```
   return Stream.of(
   TestSpec.forStrategy(
   "infer an array's element type",
   SpecificTypeStrategies.ARRAY_ELEMENT)
   
.inputTypes(DataTypes.ARRAY(DataTypes.INT().notNull()).notNull())
   
.expectDataType(DataTypes.INT().notNull().bridgedTo(int.class)),
   TestSpec.forStrategy(
   "infer an array's element type",
   SpecificTypeStrategies.ARRAY_ELEMENT)
   .inputTypes(DataTypes.ARRAY(DataTypes.INT()))
   .expectDataType(DataTypes.INT()),
   TestSpec.forStrategy(
   "infer an array's element type",
   SpecificTypeStrategies.ARRAY_ELEMENT)
   
.inputTypes(DataTypes.ARRAY(DataTypes.INT().notNull()))
   
.expectDataType(DataTypes.INT().bridgedTo(int.class)));
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dawidwys commented on pull request #22730: [FLINK-32257]-table-Add-ARRAY_MAX-function

2023-06-29 Thread via GitHub


dawidwys commented on PR #22730:
URL: https://github.com/apache/flink/pull/22730#issuecomment-1613577033

   @liuyongvs What do you mean? Do you mean 32257 mentioned `ARRAY_MIN`? I 
switched those two in JIRA after I merged it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on pull request #22730: [FLINK-32257]-table-Add-ARRAY_MAX-function

2023-06-29 Thread via GitHub


hanyuzheng7 commented on PR #22730:
URL: https://github.com/apache/flink/pull/22730#issuecomment-1613564996

   @snuyanzin do you know why this happen? Because before rebase it, this test 
case can pass. And now it has aorg.opentest4j.AssertionFailedError: 
   
   https://github.com/apache/flink/assets/135176127/d1b7e858-7f5f-41fe-a2b8-12dc6ef4cabb;>
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32487) SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails with RejectedExecution

2023-06-29 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738703#comment-17738703
 ] 

Martijn Visser commented on FLINK-32487:


Still weird that I couldn’t find it when I searched for it, but closed this 
indeed because it’s a duplicate. Thanks Sergey

> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart 
> fails with RejectedExecution
> -
>
> Key: FLINK-32487
> URL: https://issues.apache.org/jira/browse/FLINK-32487
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.18.0, 1.16.3, 1.17.2
>Reporter: Martijn Visser
>Priority: Blocker
>
> {code:java}
> Jun 29 03:21:25 03:21:25.954 [INFO] 
> Jun 29 03:21:25 03:21:25.954 [ERROR] Errors: 
> Jun 29 03:21:25 03:21:25.954 [ERROR]   
> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart:192 
> » RejectedExecution
> Jun 29 03:21:25 03:21:25.955 [INFO] 
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50611=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8613
> This is currently breaking master, release-1.17 and release-1.16



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-32478) SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails

2023-06-29 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser reopened FLINK-32478:


> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails
> --
>
> Key: FLINK-32478
> URL: https://issues.apache.org/jira/browse/FLINK-32478
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.18.0, 1.16.3, 1.17.2
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails
>  
> Root cause: multiple sources share the same thread pool, and the second 
> source cannot start due to the first source closes the shared thread pool.
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50611=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8613



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32487) SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails with RejectedExecution

2023-06-29 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-32487.
--
Resolution: Duplicate

> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart 
> fails with RejectedExecution
> -
>
> Key: FLINK-32487
> URL: https://issues.apache.org/jira/browse/FLINK-32487
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.18.0, 1.16.3, 1.17.2
>Reporter: Martijn Visser
>Priority: Blocker
>
> {code:java}
> Jun 29 03:21:25 03:21:25.954 [INFO] 
> Jun 29 03:21:25 03:21:25.954 [ERROR] Errors: 
> Jun 29 03:21:25 03:21:25.954 [ERROR]   
> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart:192 
> » RejectedExecution
> Jun 29 03:21:25 03:21:25.955 [INFO] 
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50611=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8613
> This is currently breaking master, release-1.17 and release-1.16



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32478) SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails

2023-06-29 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738702#comment-17738702
 ] 

Martijn Visser commented on FLINK-32478:


Looking at 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50667=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8
 I don’t think this is yet fixed [~fanrui]

> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails
> --
>
> Key: FLINK-32478
> URL: https://issues.apache.org/jira/browse/FLINK-32478
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.18.0, 1.16.3, 1.17.2
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails
>  
> Root cause: multiple sources share the same thread pool, and the second 
> source cannot start due to the first source closes the shared thread pool.
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50611=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8613



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] liuyongvs commented on pull request #22730: [FLINK-32257]-table-Add-ARRAY_MAX-function

2023-06-29 Thread via GitHub


liuyongvs commented on PR #22730:
URL: https://github.com/apache/flink/pull/22730#issuecomment-1613557241

   @hanyuzheng7 @dawidwys @snuyanzin and the commit log is also not good 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on pull request #22730: [FLINK-32257]-table-Add-ARRAY_MAX-function

2023-06-29 Thread via GitHub


hanyuzheng7 commented on PR #22730:
URL: https://github.com/apache/flink/pull/22730#issuecomment-1613544177

   @dawidwys ok I will fix it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22908: [FLINK-32489][runtime] Support serialize and merge BloomFilter for runtime filter

2023-06-29 Thread via GitHub


flinkbot commented on PR #22908:
URL: https://github.com/apache/flink/pull/22908#issuecomment-1613511155

   
   ## CI report:
   
   * 5723ee7f7c47e74253617031e13e44ba0f52a08f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32491) Introduce RuntimeFilterOperator to support runtime filter which can reduce the shuffle data size before shuffle join

2023-06-29 Thread dalongliu (Jira)
dalongliu created FLINK-32491:
-

 Summary: Introduce RuntimeFilterOperator to support runtime filter 
which can reduce the shuffle data size before shuffle join
 Key: FLINK-32491
 URL: https://issues.apache.org/jira/browse/FLINK-32491
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Affects Versions: 1.18.0
Reporter: dalongliu
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] dawidwys commented on pull request #22730: [FLINK-32257]-table-Add-ARRAY_MAX-function

2023-06-29 Thread via GitHub


dawidwys commented on PR #22730:
URL: https://github.com/apache/flink/pull/22730#issuecomment-1613505234

   @snuyanzin Sorry for that. I must've looked at the wrong CI run, before the 
rebase. Thanks for the revert, we'll take it from here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32489) Support serialize and merge BloomFilter for runtime filter

2023-06-29 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32489:
---
Labels: pull-request-available  (was: )

> Support serialize and merge BloomFilter for runtime filter
> --
>
> Key: FLINK-32489
> URL: https://issues.apache.org/jira/browse/FLINK-32489
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Affects Versions: 1.18.0
>Reporter: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Runtime filter needs to use BloomFIlter, it would be transferred from runtime 
> filter builder operator to runtime filter operator via network, so 
> serialization and merge need.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] lsyldliu opened a new pull request, #22908: [FLINK-32489][runtime] Support serialize and merge BloomFilter for runtime filter

2023-06-29 Thread via GitHub


lsyldliu opened a new pull request, #22908:
URL: https://github.com/apache/flink/pull/22908

   ## What is the purpose of the change
   
   *Runtime filter needs to use BloomFIlter, it would be transferred from 
runtime filter builder operator to runtime filter operator via network, so 
serialization and merge need.*
   
   
   ## Brief change log
   
 - *Support serialize and merge BloomFilter for runtime filter*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added unit test in BloomFIlterTest*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no)
 - The S3 file system connector: ( no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (JavaDocs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-28744) Upgrade Calcite version to 1.31

2023-06-29 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-28744.

Release Note: Due to CALCITE-4861 (Optimization of chained CAST calls can 
lead to unexpected behavior), also Flink's casting behavior has slightly 
changed. Some corner cases might behave differently now: For example, casting 
from FLOAT/DOUBLE 9234567891.12 to INT/BIGINT has now Java behavior for 
overflows.  (was: Since in Calcite 1.31.0 there was fixed issue 
https://issues.apache.org/jira/browse/CALCITE-4861 (Optimization of chained 
CAST calls can lead to unexpected behavior) it impacts Flink cast of numbers 
behavior. 
This could lead to different result in SQL casts for cases where overflow of 
one of the types is happened)
  Resolution: Fixed

Fixed in master: ab6a81118b45c06e822f3c77468dd7d0afb66b9e

> Upgrade Calcite version to 1.31
> ---
>
> Key: FLINK-28744
> URL: https://issues.apache.org/jira/browse/FLINK-28744
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / API
>Reporter: Martijn Visser
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> We should upgrade to Calcite 1.31 so we can benefit from 
> https://issues.apache.org/jira/browse/CALCITE-4865



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] ferenc-csaky commented on pull request #20542: [FLINK-28910][Connectors/hbase]Fix potential data deletion while updating HBase rows

2023-06-29 Thread via GitHub


ferenc-csaky commented on PR #20542:
URL: https://github.com/apache/flink/pull/20542#issuecomment-1613487568

   > So is this superseded by #22612 or not?
   
   Yes, the 2 issues have the same root cause, that an insert and a delete 
operation are passed to HBase with the same millisecond precision TS and in 
that case, the order of the HBase execution is not guaranteed. The changes made 
in #22612 explicitly sets nanosecond precision timestamps for the HBase 
operations, so it eliminates the possibility to have multiple operations "at 
the same time", so deletes and inserts will be executed in correct order. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-gcp-pubsub] jayadeep-jayaraman commented on pull request #13: [FLINK-32031] Flink GCP Connector having issues with Conscrypt library

2023-06-29 Thread via GitHub


jayadeep-jayaraman commented on PR #13:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/13#issuecomment-1613483115

   > @jayadeep-jayaraman I actually can't merge this PR, because it contains a 
merge commit. Normally I would rebase myself and push the changes, but since 
you've built this on top of `main` in your fork, I can't push to that branch. 
So you'll need to rebase yourself and force push these changes
   
   are you referring to pushing the changing to 3.0 branch? Will the changes be 
merged to both the main and 3.0 branch? If yes then can this change be merged 
with main and I will create a separate PR for 3.0 branch with my changes, will 
that work?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on pull request #22730: [FLINK-32257]-table-Add-ARRAY_MAX-function

2023-06-29 Thread via GitHub


snuyanzin commented on PR #22730:
URL: https://github.com/apache/flink/pull/22730#issuecomment-1613480706

   @dawidwys , @hanyuzheng7 
   I'm really sorry however we had to revert this change since it brings a 
blocker issue with ArrayElementOutputTypeStrategyTest
   which is constantly failing even locally like
   ```
   [ERROR] Failures: 
   [ERROR]   
ArrayElementOutputTypeStrategyTest>TypeStrategiesTestBase.testTypeStrategy:58 
   expected: "INT NOT NULL (AtomicDataType@26e4eacd)"
but was: "INT NOT NULL (AtomicDataType@1716b369)"
   [ERROR]   
ArrayElementOutputTypeStrategyTest>TypeStrategiesTestBase.testTypeStrategy:58 
   expected: "INT (AtomicDataType@18ab74e7)"
but was: "INT (AtomicDataType@f0704a2)"
   ```
   and ci mirror
   
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50669=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=8884
   
   also I noticed that the latest ci for this PR fails with exactly same problem


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32257) Add ARRAY_MAX support in SQL & Table API

2023-06-29 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738633#comment-17738633
 ] 

Matthias Pohl commented on FLINK-32257:
---

Just for the sake of documentation:
revert on master: 5ad86c2f01bea141ca76250ae4035d4e6403c8ea (FLINK-32490)

> Add ARRAY_MAX support in SQL & Table API
> 
>
> Key: FLINK-32257
> URL: https://issues.apache.org/jira/browse/FLINK-32257
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Bonnie Varghese
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> This is an implementation of ARRAY_MAX
> The array_max() function concatenates get the maximum element from input 
> array.
> The result matches the type of the elements. NULL elements are skipped. If 
> array is empty, or contains only NULL elements, NULL is returned.
>  
> Syntax
> array_max(array)
> Arguments
> array: Any ARRAY with elements for which order is supported.
>  
> Returns
> The result matches the type of the elements. NULL elements are skipped. If 
> array is empty, or contains only NULL elements, NULL is returned.
>  
> Examples
> SQL
>  
> > SELECT array_max(array(1, 20, NULL, 3)); 20
>  
> {code:java}
> // Fink SQL-> select array_max(array[1, 20, null, 3])
> 20{code}
>  
> See also
> spark 
> [https://spark.apache.org/docs/latest/api/sql/index.html#array_max|https://spark.apache.org/docs/latest/api/sql/index.html#array_min]
> presto [https://prestodb.io/docs/current/functions/array.html]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-32257) Add ARRAY_MAX support in SQL & Table API

2023-06-29 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin reopened FLINK-32257:
-

> Add ARRAY_MAX support in SQL & Table API
> 
>
> Key: FLINK-32257
> URL: https://issues.apache.org/jira/browse/FLINK-32257
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Bonnie Varghese
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> This is an implementation of ARRAY_MAX
> The array_max() function concatenates get the maximum element from input 
> array.
> The result matches the type of the elements. NULL elements are skipped. If 
> array is empty, or contains only NULL elements, NULL is returned.
>  
> Syntax
> array_max(array)
> Arguments
> array: Any ARRAY with elements for which order is supported.
>  
> Returns
> The result matches the type of the elements. NULL elements are skipped. If 
> array is empty, or contains only NULL elements, NULL is returned.
>  
> Examples
> SQL
>  
> > SELECT array_max(array(1, 20, NULL, 3)); 20
>  
> {code:java}
> // Fink SQL-> select array_max(array[1, 20, null, 3])
> 20{code}
>  
> See also
> spark 
> [https://spark.apache.org/docs/latest/api/sql/index.html#array_max|https://spark.apache.org/docs/latest/api/sql/index.html#array_min]
> presto [https://prestodb.io/docs/current/functions/array.html]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   >