[jira] [Updated] (FLINK-32027) Batch jobs could hang at shuffle phase when max parallelism is really large

2023-05-09 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-32027:

Affects Version/s: 1.16.1
   1.17.0

> Batch jobs could hang at shuffle phase when max parallelism is really large
> ---
>
> Key: FLINK-32027
> URL: https://issues.apache.org/jira/browse/FLINK-32027
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0, 1.17.0, 1.16.1
>Reporter: Yun Tang
>Assignee: Weijie Guo
>Priority: Blocker
> Attachments: image-2023-05-08-11-12-58-361.png
>
>
> In batch stream mode with adaptive batch schedule mode, If we set the max 
> parallelism large as 32768 (pipeline.max-parallelism), the job could hang at 
> the shuffle phase:
> It would hang for a long time and show "No bytes sent":
>  !image-2023-05-08-11-12-58-361.png! 
> After some time to debug, we can see the downstream operator did not receive 
> the end-of-partition event.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32027) Batch jobs could hang at shuffle phase when max parallelism is really large

2023-05-09 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-32027:

Fix Version/s: 1.16.2
   1.17.1

> Batch jobs could hang at shuffle phase when max parallelism is really large
> ---
>
> Key: FLINK-32027
> URL: https://issues.apache.org/jira/browse/FLINK-32027
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0, 1.17.0, 1.16.1
>Reporter: Yun Tang
>Assignee: Weijie Guo
>Priority: Blocker
> Fix For: 1.16.2, 1.17.1
>
> Attachments: image-2023-05-08-11-12-58-361.png
>
>
> In batch stream mode with adaptive batch schedule mode, If we set the max 
> parallelism large as 32768 (pipeline.max-parallelism), the job could hang at 
> the shuffle phase:
> It would hang for a long time and show "No bytes sent":
>  !image-2023-05-08-11-12-58-361.png! 
> After some time to debug, we can see the downstream operator did not receive 
> the end-of-partition event.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31386) Fix the potential deadlock issue of blocking shuffle

2023-04-23 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715425#comment-17715425
 ] 

Yingjie Cao commented on FLINK-31386:
-

Cherry picked to 1.16 via 4e9516aa855cd5262a8574ecce60768553f0e7cf.

> Fix the potential deadlock issue of blocking shuffle
> 
>
> Key: FLINK-31386
> URL: https://issues.apache.org/jira/browse/FLINK-31386
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0, 1.16.1
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.2
>
>
> Currently, the SortMergeResultPartition may allocate more network buffers 
> than the guaranteed size of the LocalBufferPool. As a result, some result 
> partitions may need to wait other result partitions to release the 
> over-allocated network buffers to continue. However, the result partitions 
> which have allocated more than guaranteed buffers relies on the processing of 
> input data to trigger data spilling and buffer recycling. The input data 
> further relies on batch reading buffers used by the 
> SortMergeResultPartitionReadScheduler which may already taken by those 
> blocked result partitions which are waiting for buffers. Then deadlock 
> occurs. We can easily fix this deadlock by reserving the guaranteed buffers 
> on initializing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31386) Fix the potential deadlock issue of blocking shuffle

2023-04-23 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-31386:

Affects Version/s: 1.16.1
   1.16.0

> Fix the potential deadlock issue of blocking shuffle
> 
>
> Key: FLINK-31386
> URL: https://issues.apache.org/jira/browse/FLINK-31386
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0, 1.16.1
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.2
>
>
> Currently, the SortMergeResultPartition may allocate more network buffers 
> than the guaranteed size of the LocalBufferPool. As a result, some result 
> partitions may need to wait other result partitions to release the 
> over-allocated network buffers to continue. However, the result partitions 
> which have allocated more than guaranteed buffers relies on the processing of 
> input data to trigger data spilling and buffer recycling. The input data 
> further relies on batch reading buffers used by the 
> SortMergeResultPartitionReadScheduler which may already taken by those 
> blocked result partitions which are waiting for buffers. Then deadlock 
> occurs. We can easily fix this deadlock by reserving the guaranteed buffers 
> on initializing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31386) Fix the potential deadlock issue of blocking shuffle

2023-04-23 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-31386:

Fix Version/s: 1.16.2

> Fix the potential deadlock issue of blocking shuffle
> 
>
> Key: FLINK-31386
> URL: https://issues.apache.org/jira/browse/FLINK-31386
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.2
>
>
> Currently, the SortMergeResultPartition may allocate more network buffers 
> than the guaranteed size of the LocalBufferPool. As a result, some result 
> partitions may need to wait other result partitions to release the 
> over-allocated network buffers to continue. However, the result partitions 
> which have allocated more than guaranteed buffers relies on the processing of 
> input data to trigger data spilling and buffer recycling. The input data 
> further relies on batch reading buffers used by the 
> SortMergeResultPartitionReadScheduler which may already taken by those 
> blocked result partitions which are waiting for buffers. Then deadlock 
> occurs. We can easily fix this deadlock by reserving the guaranteed buffers 
> on initializing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31610) Refactoring of LocalBufferPool

2023-04-13 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711825#comment-17711825
 ] 

Yingjie Cao commented on FLINK-31610:
-

Sorry for the delayed reply.

Totally agree that we should simply the LocalBufferPool implementation. IMO, 
the most complicated part of the buffer pool comes from the mechanism of 
interaction between global pool and local pool.

One idea I can come up with is to make requesting buffer from global pool and 
registering of the global pool availability callback atomic, that is, we has 
such a method in the global NetworkBufferPool, like 
tryRequestMemorySegment(AvailableCallback callback), if we can request a 
buffer, we do not register the callback, otherwise, we register the callback.

In the current implementation, there are too many branches, for example, if we 
can not request a buffer from global pool, we register a callback, that 
callback can be called immediately if the global pool availability is already 
completed and the local pool will try to request buffer from global pool again 
but the request can fail, if fail, it needs to register the callback again.

What do you think? 

> Refactoring of LocalBufferPool
> --
>
> Key: FLINK-31610
> URL: https://issues.apache.org/jira/browse/FLINK-31610
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.17.0
>Reporter: Anton Kalashnikov
>Priority: Major
>
> FLINK-31293 bug highlighted the issue with the internal mutual consistency of 
> different fields in LocalBufferPool. ex.:
> -  `numberOfRequestedOverdraftMemorySegments`
> -  `numberOfRequestedMemorySegments`
> -  `availableMemorySegment`
> -  `currentPoolSize`
> Most of the problem was fixed already(I hope) but it is a good idea to 
> reorganize the code in such a way that all invariants between all fields 
> inside will be clearly determined and difficult to break.
> As one example I can propose getting rid of 
> numberOfRequestedOverdraftMemorySegments and using existing 
> numberOfRequestedMemorySegments instead. That means:
> - the pool will be available when `!availableMemorySegments.isEmpty() && 
> unavailableSubpartitionsCount == 0`
> - we don't request a new `ordinary` buffer when 
> `numberOfRequestedMemorySegments >=  currentPoolSize` but we request the 
> overdraft buffer instead
> - `setNumBuffers` should work automatically without any changes
> I think we can come up with a couple of such improvements to simplify the 
> code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-31610) Refactoring of LocalBufferPool

2023-04-06 Thread Yingjie Cao (Jira)


[ https://issues.apache.org/jira/browse/FLINK-31610 ]


Yingjie Cao deleted comment on FLINK-31610:
-

was (Author: kevin.cyj):
Sorry for being so late.

> Refactoring of LocalBufferPool
> --
>
> Key: FLINK-31610
> URL: https://issues.apache.org/jira/browse/FLINK-31610
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.17.0
>Reporter: Anton Kalashnikov
>Priority: Major
>
> FLINK-31293 bug highlighted the issue with the internal mutual consistency of 
> different fields in LocalBufferPool. ex.:
> -  `numberOfRequestedOverdraftMemorySegments`
> -  `numberOfRequestedMemorySegments`
> -  `availableMemorySegment`
> -  `currentPoolSize`
> Most of the problem was fixed already(I hope) but it is a good idea to 
> reorganize the code in such a way that all invariants between all fields 
> inside will be clearly determined and difficult to break.
> As one example I can propose getting rid of 
> numberOfRequestedOverdraftMemorySegments and using existing 
> numberOfRequestedMemorySegments instead. That means:
> - the pool will be available when `!availableMemorySegments.isEmpty() && 
> unavailableSubpartitionsCount == 0`
> - we don't request a new `ordinary` buffer when 
> `numberOfRequestedMemorySegments >=  currentPoolSize` but we request the 
> overdraft buffer instead
> - `setNumBuffers` should work automatically without any changes
> I think we can come up with a couple of such improvements to simplify the 
> code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31610) Refactoring of LocalBufferPool

2023-04-06 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709284#comment-17709284
 ] 

Yingjie Cao commented on FLINK-31610:
-

Sorry for being so late.

> Refactoring of LocalBufferPool
> --
>
> Key: FLINK-31610
> URL: https://issues.apache.org/jira/browse/FLINK-31610
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.17.0
>Reporter: Anton Kalashnikov
>Priority: Major
>
> FLINK-31293 bug highlighted the issue with the internal mutual consistency of 
> different fields in LocalBufferPool. ex.:
> -  `numberOfRequestedOverdraftMemorySegments`
> -  `numberOfRequestedMemorySegments`
> -  `availableMemorySegment`
> -  `currentPoolSize`
> Most of the problem was fixed already(I hope) but it is a good idea to 
> reorganize the code in such a way that all invariants between all fields 
> inside will be clearly determined and difficult to break.
> As one example I can propose getting rid of 
> numberOfRequestedOverdraftMemorySegments and using existing 
> numberOfRequestedMemorySegments instead. That means:
> - the pool will be available when `!availableMemorySegments.isEmpty() && 
> unavailableSubpartitionsCount == 0`
> - we don't request a new `ordinary` buffer when 
> `numberOfRequestedMemorySegments >=  currentPoolSize` but we request the 
> overdraft buffer instead
> - `setNumBuffers` should work automatically without any changes
> I think we can come up with a couple of such improvements to simplify the 
> code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31418) SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on CI

2023-03-14 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao closed FLINK-31418.
---
Resolution: Fixed

> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on 
> CI
> ---
>
> Key: FLINK-31418
> URL: https://issues.apache.org/jira/browse/FLINK-31418
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.17.0
>Reporter: Qingsheng Ren
>Assignee: Yuxin Tan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47077=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8756]
> Error message:
> {code:java}
> Mar 13 05:22:10 [ERROR] Failures: 
> Mar 13 05:22:10 [ERROR]   
> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout:278 
> Mar 13 05:22:10 Expecting value to be true but was false{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31418) SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on CI

2023-03-14 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-31418:

Fix Version/s: 1.17.0

> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on 
> CI
> ---
>
> Key: FLINK-31418
> URL: https://issues.apache.org/jira/browse/FLINK-31418
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.17.0
>Reporter: Qingsheng Ren
>Assignee: Yuxin Tan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47077=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8756]
> Error message:
> {code:java}
> Mar 13 05:22:10 [ERROR] Failures: 
> Mar 13 05:22:10 [ERROR]   
> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout:278 
> Mar 13 05:22:10 Expecting value to be true but was false{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31418) SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on CI

2023-03-13 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao reassigned FLINK-31418:
---

Assignee: Yuxin Tan

> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on 
> CI
> ---
>
> Key: FLINK-31418
> URL: https://issues.apache.org/jira/browse/FLINK-31418
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.17.0
>Reporter: Qingsheng Ren
>Assignee: Yuxin Tan
>Priority: Critical
>  Labels: pull-request-available
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47077=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8756]
> Error message:
> {code:java}
> Mar 13 05:22:10 [ERROR] Failures: 
> Mar 13 05:22:10 [ERROR]   
> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout:278 
> Mar 13 05:22:10 Expecting value to be true but was false{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31418) SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on CI

2023-03-13 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699913#comment-17699913
 ] 

Yingjie Cao commented on FLINK-31418:
-

It is a test issue, we will fix it soon.

> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on 
> CI
> ---
>
> Key: FLINK-31418
> URL: https://issues.apache.org/jira/browse/FLINK-31418
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.17.0
>Reporter: Qingsheng Ren
>Priority: Critical
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47077=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8756]
> Error message:
> {code:java}
> Mar 13 05:22:10 [ERROR] Failures: 
> Mar 13 05:22:10 [ERROR]   
> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout:278 
> Mar 13 05:22:10 Expecting value to be true but was false{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31418) SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on CI

2023-03-13 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699561#comment-17699561
 ] 

Yingjie Cao edited comment on FLINK-31418 at 3/13/23 10:51 AM:
---

I will also try to reproduce it. If it can not reproduce, let's downgrade this 
issue. I think it not need to be a blocker, because it may exits since 1.16 and 
there is no relevant change recently.


was (Author: kevin.cyj):
I will also try to reproduce it. If it can reproduce, let's downgrade this 
issue. I think it not need to be a blocker, because it may exits since 1.16 and 
there is no relevant change recently.

> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on 
> CI
> ---
>
> Key: FLINK-31418
> URL: https://issues.apache.org/jira/browse/FLINK-31418
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.17.0
>Reporter: Qingsheng Ren
>Priority: Blocker
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47077=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8756]
> Error message:
> {code:java}
> Mar 13 05:22:10 [ERROR] Failures: 
> Mar 13 05:22:10 [ERROR]   
> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout:278 
> Mar 13 05:22:10 Expecting value to be true but was false{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31418) SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on CI

2023-03-13 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699561#comment-17699561
 ] 

Yingjie Cao commented on FLINK-31418:
-

I will also try to reproduce it. If it can reproduce, let's downgrade this 
issue. I think it not need to be a blocker, because it may exits since 1.16 and 
there is no relevant change recently.

> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on 
> CI
> ---
>
> Key: FLINK-31418
> URL: https://issues.apache.org/jira/browse/FLINK-31418
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.17.0
>Reporter: Qingsheng Ren
>Priority: Blocker
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47077=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8756]
> Error message:
> {code:java}
> Mar 13 05:22:10 [ERROR] Failures: 
> Mar 13 05:22:10 [ERROR]   
> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout:278 
> Mar 13 05:22:10 Expecting value to be true but was false{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31312) EnableObjectReuse cause different behaviors

2023-03-13 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699482#comment-17699482
 ] 

Yingjie Cao commented on FLINK-31312:
-

Sorry, I did not notice the fact that the constructor is not public. 
Personally, I do not think it is bug or a feature, I would treat it as a 
weakness in API design.

> EnableObjectReuse cause different behaviors
> ---
>
> Key: FLINK-31312
> URL: https://issues.apache.org/jira/browse/FLINK-31312
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: Jiang Xin
>Priority: Major
>
> I have the following test code which fails with the exception `Accessing a 
> field by name is not supported in position-based field mode`, however, if I 
> remove the `enableObjectReuse`, it works. 
> The `SourceFunction` generates rows without field names, but the return type 
> info is assigned by `env.addSource(rowGenerator, typeInfo)`.
> With object-reuse enabled, rows would be passed to the MapFunction directly, 
> so the exception raises. While if the object-reuse is disabled,  rows would 
> be reconstructed and given field names when passing to the next operator and 
> the test works well.
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> // The test fails with enableObjectReuse
> env.getConfig().enableObjectReuse();
> final SourceFunction rowGenerator =
> new SourceFunction() {
> @Override
> public final void run(SourceContext ctx) throws 
> Exception {
> Row row = new Row(1);
> row.setField(0, "a");
> ctx.collect(row);
> }
> @Override
> public void cancel() {}
> };
> final RowTypeInfo typeInfo =
> new RowTypeInfo(new TypeInformation[] {Types.STRING}, new 
> String[] {"col1"});
> DataStream dataStream = env.addSource(rowGenerator, typeInfo);
> DataStream transformedDataStream =
> dataStream.map(
> (MapFunction) value -> 
> Row.of(value.getField("col1")), typeInfo);
> transformedDataStream.addSink(new PrintSinkFunction<>());
> env.execute("Mini Test");
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31386) Fix the potential deadlock issue of blocking shuffle

2023-03-10 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao closed FLINK-31386.
---
Resolution: Fixed

> Fix the potential deadlock issue of blocking shuffle
> 
>
> Key: FLINK-31386
> URL: https://issues.apache.org/jira/browse/FLINK-31386
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Currently, the SortMergeResultPartition may allocate more network buffers 
> than the guaranteed size of the LocalBufferPool. As a result, some result 
> partitions may need to wait other result partitions to release the 
> over-allocated network buffers to continue. However, the result partitions 
> which have allocated more than guaranteed buffers relies on the processing of 
> input data to trigger data spilling and buffer recycling. The input data 
> further relies on batch reading buffers used by the 
> SortMergeResultPartitionReadScheduler which may already taken by those 
> blocked result partitions which are waiting for buffers. Then deadlock 
> occurs. We can easily fix this deadlock by reserving the guaranteed buffers 
> on initializing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31386) Fix the potential deadlock issue of blocking shuffle

2023-03-10 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17698953#comment-17698953
 ] 

Yingjie Cao commented on FLINK-31386:
-

Merged into 1.17 to unblock release. Will pick to master latter.

> Fix the potential deadlock issue of blocking shuffle
> 
>
> Key: FLINK-31386
> URL: https://issues.apache.org/jira/browse/FLINK-31386
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Currently, the SortMergeResultPartition may allocate more network buffers 
> than the guaranteed size of the LocalBufferPool. As a result, some result 
> partitions may need to wait other result partitions to release the 
> over-allocated network buffers to continue. However, the result partitions 
> which have allocated more than guaranteed buffers relies on the processing of 
> input data to trigger data spilling and buffer recycling. The input data 
> further relies on batch reading buffers used by the 
> SortMergeResultPartitionReadScheduler which may already taken by those 
> blocked result partitions which are waiting for buffers. Then deadlock 
> occurs. We can easily fix this deadlock by reserving the guaranteed buffers 
> on initializing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31386) Fix the potential deadlock issue of blocking shuffle

2023-03-09 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao reassigned FLINK-31386:
---

Assignee: Yingjie Cao

> Fix the potential deadlock issue of blocking shuffle
> 
>
> Key: FLINK-31386
> URL: https://issues.apache.org/jira/browse/FLINK-31386
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Blocker
> Fix For: 1.17.0
>
>
> Currently, the SortMergeResultPartition may allocate more network buffers 
> than the guaranteed size of the LocalBufferPool. As a result, some result 
> partitions may need to wait other result partitions to release the 
> over-allocated network buffers to continue. However, the result partitions 
> which have allocated more than guaranteed buffers relies on the processing of 
> input data to trigger data spilling and buffer recycling. The input data 
> further relies on batch reading buffers used by the 
> SortMergeResultPartitionReadScheduler which may already taken by those 
> blocked result partitions which are waiting for buffers. Then deadlock 
> occurs. We can easily fix this deadlock by reserving the guaranteed buffers 
> on initializing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31386) Fix the potential deadlock issue of blocking shuffle

2023-03-09 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-31386:
---

 Summary: Fix the potential deadlock issue of blocking shuffle
 Key: FLINK-31386
 URL: https://issues.apache.org/jira/browse/FLINK-31386
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.17.0


Currently, the SortMergeResultPartition may allocate more network buffers than 
the guaranteed size of the LocalBufferPool. As a result, some result partitions 
may need to wait other result partitions to release the over-allocated network 
buffers to continue. However, the result partitions which have allocated more 
than guaranteed buffers relies on the processing of input data to trigger data 
spilling and buffer recycling. The input data further relies on batch reading 
buffers used by the SortMergeResultPartitionReadScheduler which may already 
taken by those blocked result partitions which are waiting for buffers. Then 
deadlock occurs. We can easily fix this deadlock by reserving the guaranteed 
buffers on initializing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31266) dashboard info error (received and send alway show 0 when having data)

2023-03-09 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17698264#comment-17698264
 ] 

Yingjie Cao commented on FLINK-31266:
-

Something like NaN may be better than 0? BTW, do newer flink versions still 
have the same behavior?

> dashboard info error (received and send alway show 0 when having data)
> --
>
> Key: FLINK-31266
> URL: https://issues.apache.org/jira/browse/FLINK-31266
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Affects Versions: 1.14.4
>Reporter: linqichen
>Priority: Major
> Attachments: receivedAndSend0.jpg
>
>
> !receivedAndSend0.jpg!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31312) EnableObjectReuse cause different behaviors

2023-03-09 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17698249#comment-17698249
 ] 

Yingjie Cao commented on FLINK-31312:
-

There is another constructor in Row:

Row(
RowKind kind,
@Nullable Object[] fieldByPosition,
@Nullable Map fieldByName,
@Nullable LinkedHashMap positionByName);

I guess if this is used, there should be no problem.

> EnableObjectReuse cause different behaviors
> ---
>
> Key: FLINK-31312
> URL: https://issues.apache.org/jira/browse/FLINK-31312
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: Jiang Xin
>Priority: Major
>
> I have the following test code which fails with the exception `Accessing a 
> field by name is not supported in position-based field mode`, however, if I 
> remove the `enableObjectReuse`, it works. 
> The `SourceFunction` generates rows without field names, but the return type 
> info is assigned by `env.addSource(rowGenerator, typeInfo)`.
> With object-reuse enabled, rows would be passed to the MapFunction directly, 
> so the exception raises. While if the object-reuse is disabled,  rows would 
> be reconstructed and given field names when passing to the next operator and 
> the test works well.
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> // The test fails with enableObjectReuse
> env.getConfig().enableObjectReuse();
> final SourceFunction rowGenerator =
> new SourceFunction() {
> @Override
> public final void run(SourceContext ctx) throws 
> Exception {
> Row row = new Row(1);
> row.setField(0, "a");
> ctx.collect(row);
> }
> @Override
> public void cancel() {}
> };
> final RowTypeInfo typeInfo =
> new RowTypeInfo(new TypeInformation[] {Types.STRING}, new 
> String[] {"col1"});
> DataStream dataStream = env.addSource(rowGenerator, typeInfo);
> DataStream transformedDataStream =
> dataStream.map(
> (MapFunction) value -> 
> Row.of(value.getField("col1")), typeInfo);
> transformedDataStream.addSink(new PrintSinkFunction<>());
> env.execute("Mini Test");
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28189) The taskmanager dynamically created by flink native k8s creates pods directly, why not create a deployment?

2023-02-23 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao closed FLINK-28189.
---
Resolution: Not A Problem

> The taskmanager dynamically created by flink native k8s creates pods 
> directly, why not create a deployment?
> ---
>
> Key: FLINK-28189
> URL: https://issues.apache.org/jira/browse/FLINK-28189
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Affects Versions: 1.14.4
>Reporter: hob007
>Priority: Major
>
> I am using Flink native k8s deployment. The pods is dynamically created. But 
> there's some problem with K8S cluster monitor. I can't monitor taskmanager 
> pods status, because these pods without deployment.
>  
> So my question is:
> The taskmanager dynamically created by flink native k8s creates pods 
> directly, why not create a deployment?
>  
> For example:
> Define a deployment and using replicates to control taskmanager's pods.
>  
> Chinese description reference:
> https://issues.apache.org/jira/browse/FLINK-28167



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28378) Use larger data reading buffer size for sort-shuffle

2023-02-09 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao closed FLINK-28378.
---
Resolution: Won't Fix

> Use larger data reading buffer size for sort-shuffle
> 
>
> Key: FLINK-28378
> URL: https://issues.apache.org/jira/browse/FLINK-28378
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Major
>
> Currently, for sort shuffle, we always use the network buffer size as the 
> data reading buffer size which is 32K by default. We can increase this buffer 
> size for better performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28378) Use larger data reading buffer size for sort-shuffle

2023-02-09 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-28378:

Fix Version/s: (was: 1.17.0)

> Use larger data reading buffer size for sort-shuffle
> 
>
> Key: FLINK-28378
> URL: https://issues.apache.org/jira/browse/FLINK-28378
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Major
>
> Currently, for sort shuffle, we always use the network buffer size as the 
> data reading buffer size which is 32K by default. We can increase this buffer 
> size for better performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28326) ResultPartitionTest.testIdleAndBackPressuredTime failed with AssertError

2023-02-07 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-28326:

Fix Version/s: 1.16.2

> ResultPartitionTest.testIdleAndBackPressuredTime failed with AssertError
> 
>
> Key: FLINK-28326
> URL: https://issues.apache.org/jira/browse/FLINK-28326
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available, stale-assigned, test-stability
> Fix For: 1.17.0, 1.16.2
>
>
> {code:java}
> 2022-06-30T09:23:24.0469768Z Jun 30 09:23:24 [INFO] 
> 2022-06-30T09:23:24.0470382Z Jun 30 09:23:24 [ERROR] Failures: 
> 2022-06-30T09:23:24.0471581Z Jun 30 09:23:24 [ERROR]   
> ResultPartitionTest.testIdleAndBackPressuredTime:414 
> 2022-06-30T09:23:24.0472898Z Jun 30 09:23:24 Expected: a value greater than 
> <0L>
> 2022-06-30T09:23:24.0474090Z Jun 30 09:23:24  but: <0L> was equal to <0L>
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37406=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-28326) ResultPartitionTest.testIdleAndBackPressuredTime failed with AssertError

2023-02-07 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao resolved FLINK-28326.
-
Resolution: Fixed

> ResultPartitionTest.testIdleAndBackPressuredTime failed with AssertError
> 
>
> Key: FLINK-28326
> URL: https://issues.apache.org/jira/browse/FLINK-28326
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available, stale-assigned, test-stability
> Fix For: 1.17.0, 1.16.2
>
>
> {code:java}
> 2022-06-30T09:23:24.0469768Z Jun 30 09:23:24 [INFO] 
> 2022-06-30T09:23:24.0470382Z Jun 30 09:23:24 [ERROR] Failures: 
> 2022-06-30T09:23:24.0471581Z Jun 30 09:23:24 [ERROR]   
> ResultPartitionTest.testIdleAndBackPressuredTime:414 
> 2022-06-30T09:23:24.0472898Z Jun 30 09:23:24 Expected: a value greater than 
> <0L>
> 2022-06-30T09:23:24.0474090Z Jun 30 09:23:24  but: <0L> was equal to <0L>
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37406=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28326) ResultPartitionTest.testIdleAndBackPressuredTime failed with AssertError

2023-02-07 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17685678#comment-17685678
 ] 

Yingjie Cao commented on FLINK-28326:
-

1.16: 096c4a5e29cdffc8b4ed72f7bc6fc7b42dab2e9b

> ResultPartitionTest.testIdleAndBackPressuredTime failed with AssertError
> 
>
> Key: FLINK-28326
> URL: https://issues.apache.org/jira/browse/FLINK-28326
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available, stale-assigned, test-stability
> Fix For: 1.17.0
>
>
> {code:java}
> 2022-06-30T09:23:24.0469768Z Jun 30 09:23:24 [INFO] 
> 2022-06-30T09:23:24.0470382Z Jun 30 09:23:24 [ERROR] Failures: 
> 2022-06-30T09:23:24.0471581Z Jun 30 09:23:24 [ERROR]   
> ResultPartitionTest.testIdleAndBackPressuredTime:414 
> 2022-06-30T09:23:24.0472898Z Jun 30 09:23:24 Expected: a value greater than 
> <0L>
> 2022-06-30T09:23:24.0474090Z Jun 30 09:23:24  but: <0L> was equal to <0L>
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37406=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28326) ResultPartitionTest.testIdleAndBackPressuredTime failed with AssertError

2023-02-02 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-28326:

Priority: Major  (was: Minor)

> ResultPartitionTest.testIdleAndBackPressuredTime failed with AssertError
> 
>
> Key: FLINK-28326
> URL: https://issues.apache.org/jira/browse/FLINK-28326
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available, stale-assigned, test-stability
> Fix For: 1.17.0
>
>
> {code:java}
> 2022-06-30T09:23:24.0469768Z Jun 30 09:23:24 [INFO] 
> 2022-06-30T09:23:24.0470382Z Jun 30 09:23:24 [ERROR] Failures: 
> 2022-06-30T09:23:24.0471581Z Jun 30 09:23:24 [ERROR]   
> ResultPartitionTest.testIdleAndBackPressuredTime:414 
> 2022-06-30T09:23:24.0472898Z Jun 30 09:23:24 Expected: a value greater than 
> <0L>
> 2022-06-30T09:23:24.0474090Z Jun 30 09:23:24  but: <0L> was equal to <0L>
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37406=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28326) ResultPartitionTest.testIdleAndBackPressuredTime failed with AssertError

2023-02-02 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-28326:

Priority: Minor  (was: Critical)

> ResultPartitionTest.testIdleAndBackPressuredTime failed with AssertError
> 
>
> Key: FLINK-28326
> URL: https://issues.apache.org/jira/browse/FLINK-28326
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Weijie Guo
>Priority: Minor
>  Labels: pull-request-available, stale-assigned, test-stability
> Fix For: 1.17.0
>
>
> {code:java}
> 2022-06-30T09:23:24.0469768Z Jun 30 09:23:24 [INFO] 
> 2022-06-30T09:23:24.0470382Z Jun 30 09:23:24 [ERROR] Failures: 
> 2022-06-30T09:23:24.0471581Z Jun 30 09:23:24 [ERROR]   
> ResultPartitionTest.testIdleAndBackPressuredTime:414 
> 2022-06-30T09:23:24.0472898Z Jun 30 09:23:24 Expected: a value greater than 
> <0L>
> 2022-06-30T09:23:24.0474090Z Jun 30 09:23:24  but: <0L> was equal to <0L>
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37406=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28326) ResultPartitionTest.testIdleAndBackPressuredTime failed with AssertError

2023-02-02 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17683651#comment-17683651
 ] 

Yingjie Cao commented on FLINK-28326:
-

master: 7980d4b7956bbb5952933d6f4c49e27f59c96834

> ResultPartitionTest.testIdleAndBackPressuredTime failed with AssertError
> 
>
> Key: FLINK-28326
> URL: https://issues.apache.org/jira/browse/FLINK-28326
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Weijie Guo
>Priority: Critical
>  Labels: pull-request-available, stale-assigned, test-stability
> Fix For: 1.17.0
>
>
> {code:java}
> 2022-06-30T09:23:24.0469768Z Jun 30 09:23:24 [INFO] 
> 2022-06-30T09:23:24.0470382Z Jun 30 09:23:24 [ERROR] Failures: 
> 2022-06-30T09:23:24.0471581Z Jun 30 09:23:24 [ERROR]   
> ResultPartitionTest.testIdleAndBackPressuredTime:414 
> 2022-06-30T09:23:24.0472898Z Jun 30 09:23:24 Expected: a value greater than 
> <0L>
> 2022-06-30T09:23:24.0474090Z Jun 30 09:23:24  but: <0L> was equal to <0L>
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37406=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29884) Flaky test failure in finegrained_resource_management/SortMergeResultPartitionTest.testRelease

2023-01-18 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17678444#comment-17678444
 ] 

Yingjie Cao commented on FLINK-29884:
-

Merged into master via 0b8a83ce54d39d0d5a5b82573c5037f306e9f7f7.

> Flaky test failure in 
> finegrained_resource_management/SortMergeResultPartitionTest.testRelease
> --
>
> Key: FLINK-29884
> URL: https://issues.apache.org/jira/browse/FLINK-29884
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Runtime / Network, Tests
>Affects Versions: 1.17.0
>Reporter: Nico Kruber
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.17.0
>
>
> {{SortMergeResultPartitionTest.testRelease}} failed with a timeout in the 
> finegrained_resource_management tests:
> {code:java}
> Nov 03 17:28:07 [ERROR] Tests run: 20, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 64.649 s <<< FAILURE! - in 
> org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionTest
> Nov 03 17:28:07 [ERROR] SortMergeResultPartitionTest.testRelease  Time 
> elapsed: 60.009 s  <<< ERROR!
> Nov 03 17:28:07 org.junit.runners.model.TestTimedOutException: test timed out 
> after 60 seconds
> Nov 03 17:28:07   at 
> org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionTest.testRelease(SortMergeResultPartitionTest.java:374)
> Nov 03 17:28:07   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Nov 03 17:28:07   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Nov 03 17:28:07   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Nov 03 17:28:07   at java.lang.reflect.Method.invoke(Method.java:498)
> Nov 03 17:28:07   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Nov 03 17:28:07   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> Nov 03 17:28:07   at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> Nov 03 17:28:07   at java.lang.Thread.run(Thread.java:748) {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42806=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29884) Flaky test failure in finegrained_resource_management/SortMergeResultPartitionTest.testRelease

2023-01-18 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao closed FLINK-29884.
---
Resolution: Fixed

> Flaky test failure in 
> finegrained_resource_management/SortMergeResultPartitionTest.testRelease
> --
>
> Key: FLINK-29884
> URL: https://issues.apache.org/jira/browse/FLINK-29884
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Runtime / Network, Tests
>Affects Versions: 1.17.0
>Reporter: Nico Kruber
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.17.0
>
>
> {{SortMergeResultPartitionTest.testRelease}} failed with a timeout in the 
> finegrained_resource_management tests:
> {code:java}
> Nov 03 17:28:07 [ERROR] Tests run: 20, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 64.649 s <<< FAILURE! - in 
> org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionTest
> Nov 03 17:28:07 [ERROR] SortMergeResultPartitionTest.testRelease  Time 
> elapsed: 60.009 s  <<< ERROR!
> Nov 03 17:28:07 org.junit.runners.model.TestTimedOutException: test timed out 
> after 60 seconds
> Nov 03 17:28:07   at 
> org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionTest.testRelease(SortMergeResultPartitionTest.java:374)
> Nov 03 17:28:07   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Nov 03 17:28:07   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Nov 03 17:28:07   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Nov 03 17:28:07   at java.lang.reflect.Method.invoke(Method.java:498)
> Nov 03 17:28:07   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Nov 03 17:28:07   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> Nov 03 17:28:07   at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> Nov 03 17:28:07   at java.lang.Thread.run(Thread.java:748) {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42806=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29884) Flaky test failure in finegrained_resource_management/SortMergeResultPartitionTest.testRelease

2023-01-17 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao reassigned FLINK-29884:
---

Assignee: Weijie Guo  (was: Yingjie Cao)

> Flaky test failure in 
> finegrained_resource_management/SortMergeResultPartitionTest.testRelease
> --
>
> Key: FLINK-29884
> URL: https://issues.apache.org/jira/browse/FLINK-29884
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Runtime / Network, Tests
>Affects Versions: 1.17.0
>Reporter: Nico Kruber
>Assignee: Weijie Guo
>Priority: Major
>  Labels: test-stability
> Fix For: 1.17.0
>
>
> {{SortMergeResultPartitionTest.testRelease}} failed with a timeout in the 
> finegrained_resource_management tests:
> {code:java}
> Nov 03 17:28:07 [ERROR] Tests run: 20, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 64.649 s <<< FAILURE! - in 
> org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionTest
> Nov 03 17:28:07 [ERROR] SortMergeResultPartitionTest.testRelease  Time 
> elapsed: 60.009 s  <<< ERROR!
> Nov 03 17:28:07 org.junit.runners.model.TestTimedOutException: test timed out 
> after 60 seconds
> Nov 03 17:28:07   at 
> org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionTest.testRelease(SortMergeResultPartitionTest.java:374)
> Nov 03 17:28:07   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Nov 03 17:28:07   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Nov 03 17:28:07   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Nov 03 17:28:07   at java.lang.reflect.Method.invoke(Method.java:498)
> Nov 03 17:28:07   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Nov 03 17:28:07   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> Nov 03 17:28:07   at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> Nov 03 17:28:07   at java.lang.Thread.run(Thread.java:748) {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42806=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29845) ThroughputCalculator throws java.lang.IllegalArgumentException: Time should be non negative under very low throughput cluster

2023-01-17 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao reassigned FLINK-29845:
---

Assignee: Weijie Guo

> ThroughputCalculator throws java.lang.IllegalArgumentException: Time should 
> be non negative under very low throughput cluster
> -
>
> Key: FLINK-29845
> URL: https://issues.apache.org/jira/browse/FLINK-29845
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Runtime / Task
>Affects Versions: 1.14.6
>Reporter: Jingxiao GU
>Assignee: Weijie Guo
>Priority: Major
>
> Our team are using Flink@1.14.6 to process data from Kafka.
> It works all fine unless the same job jar with same arguments deployed in an 
> environment with{color:#ff} *very low kafka source throughput.*{color} 
> The job crashed sometimes with the following Exception and could not be able 
> to recover unless we restarted TaskManagers, which is unacceptable for a 
> production environment.
> {code:java}
> [2022-10-31T15:33:57.153+08:00] [o.a.f.runtime.taskmanager.Task#cess 
> (2/16)#244] - [WARN ] KeyedProcess (2/16)#244 
> (b9b54f6445419fc43c4d58fcd95cee82) switched from RUNNING to FAILED with 
> failure cause: java.lang.IllegalArgumentException: Time should be non negative
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>   at 
> org.apache.flink.runtime.throughput.ThroughputEMA.calculateThroughput(ThroughputEMA.java:44)
>   at 
> org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:80)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.debloat(StreamTask.java:789)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$4(StreamTask.java:781)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:806)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:758)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
> After checking the source code roughly, we found if buffer debloating is 
> disabled 
> ([https://github.com/apache/flink/blob/release-1.14.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L427]
>  ), the buffer debloater will still be scheduled 
> ([https://github.com/apache/flink/blob/release-1.14.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L755]
>  ) so that the {{ThrouputCalculator}}  keeps calculating the throughput 
> ([https://github.com/apache/flink/blob/release-1.14.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L789]
>  ) which causes the division of zero and seems useless as i suppose.
> Currently, we tried to workaround by setting 
> {{taskmanager.network.memory.buffer-debloat.period: 365d}} to avoid the 
> buffer debloater being scheduled frequently causing the random crash.
> P.S. We found a bug with similar stacktrace 
> https://issues.apache.org/jira/browse/FLINK-25454 which was fixed in 1.14.6.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29884) Flaky test failure in finegrained_resource_management/SortMergeResultPartitionTest.testRelease

2023-01-14 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17676983#comment-17676983
 ] 

Yingjie Cao commented on FLINK-29884:
-

Sorry for the delay, I already found the root cause. It is only a test issue 
and I will submit a PR soon. 

> Flaky test failure in 
> finegrained_resource_management/SortMergeResultPartitionTest.testRelease
> --
>
> Key: FLINK-29884
> URL: https://issues.apache.org/jira/browse/FLINK-29884
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Runtime / Network, Tests
>Affects Versions: 1.17.0
>Reporter: Nico Kruber
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: test-stability
> Fix For: 1.17.0
>
>
> {{SortMergeResultPartitionTest.testRelease}} failed with a timeout in the 
> finegrained_resource_management tests:
> {code:java}
> Nov 03 17:28:07 [ERROR] Tests run: 20, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 64.649 s <<< FAILURE! - in 
> org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionTest
> Nov 03 17:28:07 [ERROR] SortMergeResultPartitionTest.testRelease  Time 
> elapsed: 60.009 s  <<< ERROR!
> Nov 03 17:28:07 org.junit.runners.model.TestTimedOutException: test timed out 
> after 60 seconds
> Nov 03 17:28:07   at 
> org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionTest.testRelease(SortMergeResultPartitionTest.java:374)
> Nov 03 17:28:07   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Nov 03 17:28:07   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Nov 03 17:28:07   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Nov 03 17:28:07   at java.lang.reflect.Method.invoke(Method.java:498)
> Nov 03 17:28:07   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Nov 03 17:28:07   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> Nov 03 17:28:07   at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> Nov 03 17:28:07   at java.lang.Thread.run(Thread.java:748) {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42806=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30511) Ignore the Exception in user-timer Triggerble when recover form state.

2023-01-05 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao closed FLINK-30511.
---
Resolution: Duplicate

> Ignore the Exception in user-timer Triggerble when recover form state.
> --
>
> Key: FLINK-30511
> URL: https://issues.apache.org/jira/browse/FLINK-30511
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.16.0
> Environment: Flink 1.16.0
> java8
> deployment Mode: miniCluster in IDC; standalone, yarn-application.
>Reporter: RocMarshal
>Priority: Minor
> Attachments: 截屏2022-12-27 18.51.12.png, 截屏2022-12-27 19.20.00.png
>
>
> * Code segment:
> {code:java}
> public class OnTimerDemo {
> public static void main(String[] args) throws Exception {
> Configuration conf = new Configuration();
> conf.setString("taskmanager.numberOfTaskSlots", "4");
> conf.setString("state.checkpoint-storage", "filesystem");
> conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob");
> conf.setString("execution.checkpointing.interval", "30s");
> //conf.setString("execution.savepoint.path", 
> "file:///tmp/flinkjob/159561b8c97c9e0b4f9eeb649086796a/chk-1"); // Anchor-A:
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
> env.setParallelism(1);
> EnvironmentSettings envSetting = EnvironmentSettings
> .newInstance()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tableEnv =  StreamTableEnvironment.create(env, 
> envSetting);
> String sourceDDL = "CREATE TABLE orders (\n" +
> "  id   INT,\n" +
> "  app  INT,\n" +
> "  user_id  STRING" +
> ") WITH (\n" +
> "   'connector' = 'datagen',\n" +
> "   'rows-per-second'='1',\n" +
> "   'fields.app.min'='1',\n" +
> "   'fields.app.max'='10',\n" +
> "   'fields.user_id.length'='10'\n" +
> ")";
> tableEnv.executeSql(sourceDDL);
> Table query = tableEnv.sqlQuery("select * from orders");
> DataStream rowDataStream = tableEnv.toAppendStream(query, 
> Row.class);
> TypeInformation[] returnTypes = new TypeInformation[4];
> returnTypes[0] = Types.INT;
> returnTypes[1] = Types.INT; // Anchor-B:
> returnTypes[2] = Types.INT;
> returnTypes[3] = Types.INT;
> rowDataStream.keyBy(new KeySelector() {
> @Override
> public String getKey(Row value) throws Exception {
> return value.getFieldAs(2);
> }
> }).process(new KeyedProcessFunction() {
> private Row firstRow;
> @Override
> public void processElement(Row value, Context ctx, 
> Collector out) throws Exception {
> if (firstRow == null) {
> firstRow = value;
> }
> 
> ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 
> 3000);
> }
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx, 
> Collector out) throws Exception {
> Row colRow = new Row(4);
> colRow.setField(0, 0);
> colRow.setField(1, 1);
> colRow.setField(2, 2);
> colRow.setField(3, 3);
> out.collect(colRow); // Anchor-C
> }
> }).name("TargetTestUDF")
> .returns(new RowTypeInfo(returnTypes))
> .print();
> env.execute(OnTimerDemo.class.getSimpleName());
> }
> }
>  {code}
>  * Recurrence steps
>  ** Run the job without state.
>  ** Collect the latest available checkpoint path as 'checkpoint-path-a'
>  ** Stop the job.
>  ** Fill the real value of 'checkpoint-path-a' into 'Anchor-A' line and 
> un-comment the line.
>  ** Set 'returnTypes[1] = Types.INT;' -> 'returnTypes[1] = Types.LONG;' at 
> the 'Anchor-B' line.
>  ** Then add break-point at 'StreamTask#handleAsyncException' method.
>  ** Run the job. The 'java.lang.ClassCastException: java.lang.Integer cannot 
> be cast to java.lang.Long' exception caused at the 'Anchor-C' line will 
> ignore at  'StreamTask#handleAsyncException' method.
>  ** So, The framework can't catch the same exception in the case.
>  * Root cause:
>  ** !截屏2022-12-27 18.51.12.png!
>  ** When job started from state data, the 

[jira] [Commented] (FLINK-30511) Ignore the Exception in user-timer Triggerble when recover form state.

2023-01-05 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17654896#comment-17654896
 ] 

Yingjie Cao commented on FLINK-30511:
-

I am closing this ticket for now. We can discuss in FLINK-29816.

> Ignore the Exception in user-timer Triggerble when recover form state.
> --
>
> Key: FLINK-30511
> URL: https://issues.apache.org/jira/browse/FLINK-30511
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.16.0
> Environment: Flink 1.16.0
> java8
> deployment Mode: miniCluster in IDC; standalone, yarn-application.
>Reporter: RocMarshal
>Priority: Minor
> Attachments: 截屏2022-12-27 18.51.12.png, 截屏2022-12-27 19.20.00.png
>
>
> * Code segment:
> {code:java}
> public class OnTimerDemo {
> public static void main(String[] args) throws Exception {
> Configuration conf = new Configuration();
> conf.setString("taskmanager.numberOfTaskSlots", "4");
> conf.setString("state.checkpoint-storage", "filesystem");
> conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob");
> conf.setString("execution.checkpointing.interval", "30s");
> //conf.setString("execution.savepoint.path", 
> "file:///tmp/flinkjob/159561b8c97c9e0b4f9eeb649086796a/chk-1"); // Anchor-A:
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
> env.setParallelism(1);
> EnvironmentSettings envSetting = EnvironmentSettings
> .newInstance()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tableEnv =  StreamTableEnvironment.create(env, 
> envSetting);
> String sourceDDL = "CREATE TABLE orders (\n" +
> "  id   INT,\n" +
> "  app  INT,\n" +
> "  user_id  STRING" +
> ") WITH (\n" +
> "   'connector' = 'datagen',\n" +
> "   'rows-per-second'='1',\n" +
> "   'fields.app.min'='1',\n" +
> "   'fields.app.max'='10',\n" +
> "   'fields.user_id.length'='10'\n" +
> ")";
> tableEnv.executeSql(sourceDDL);
> Table query = tableEnv.sqlQuery("select * from orders");
> DataStream rowDataStream = tableEnv.toAppendStream(query, 
> Row.class);
> TypeInformation[] returnTypes = new TypeInformation[4];
> returnTypes[0] = Types.INT;
> returnTypes[1] = Types.INT; // Anchor-B:
> returnTypes[2] = Types.INT;
> returnTypes[3] = Types.INT;
> rowDataStream.keyBy(new KeySelector() {
> @Override
> public String getKey(Row value) throws Exception {
> return value.getFieldAs(2);
> }
> }).process(new KeyedProcessFunction() {
> private Row firstRow;
> @Override
> public void processElement(Row value, Context ctx, 
> Collector out) throws Exception {
> if (firstRow == null) {
> firstRow = value;
> }
> 
> ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 
> 3000);
> }
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx, 
> Collector out) throws Exception {
> Row colRow = new Row(4);
> colRow.setField(0, 0);
> colRow.setField(1, 1);
> colRow.setField(2, 2);
> colRow.setField(3, 3);
> out.collect(colRow); // Anchor-C
> }
> }).name("TargetTestUDF")
> .returns(new RowTypeInfo(returnTypes))
> .print();
> env.execute(OnTimerDemo.class.getSimpleName());
> }
> }
>  {code}
>  * Recurrence steps
>  ** Run the job without state.
>  ** Collect the latest available checkpoint path as 'checkpoint-path-a'
>  ** Stop the job.
>  ** Fill the real value of 'checkpoint-path-a' into 'Anchor-A' line and 
> un-comment the line.
>  ** Set 'returnTypes[1] = Types.INT;' -> 'returnTypes[1] = Types.LONG;' at 
> the 'Anchor-B' line.
>  ** Then add break-point at 'StreamTask#handleAsyncException' method.
>  ** Run the job. The 'java.lang.ClassCastException: java.lang.Integer cannot 
> be cast to java.lang.Long' exception caused at the 'Anchor-C' line will 
> ignore at  'StreamTask#handleAsyncException' method.
>  ** So, The framework can't catch the same exception in the case.
>  * Root cause:

[jira] [Commented] (FLINK-30511) Ignore the Exception in user-timer Triggerble when recover form state.

2023-01-05 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17654876#comment-17654876
 ] 

Yingjie Cao commented on FLINK-30511:
-

[~RocMarshal] Thanks for reporting. I guess an easy way to fix the issue is to 
throw exception for initializing state, which seems the default behavior before 
FLINK-17012.

> Ignore the Exception in user-timer Triggerble when recover form state.
> --
>
> Key: FLINK-30511
> URL: https://issues.apache.org/jira/browse/FLINK-30511
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.16.0
> Environment: Flink 1.16.0
> java8
> deployment Mode: miniCluster in IDC; standalone, yarn-application.
>Reporter: RocMarshal
>Priority: Minor
> Attachments: 截屏2022-12-27 18.51.12.png, 截屏2022-12-27 19.20.00.png
>
>
> * Code segment:
> {code:java}
> public class OnTimerDemo {
> public static void main(String[] args) throws Exception {
> Configuration conf = new Configuration();
> conf.setString("taskmanager.numberOfTaskSlots", "4");
> conf.setString("state.checkpoint-storage", "filesystem");
> conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob");
> conf.setString("execution.checkpointing.interval", "30s");
> //conf.setString("execution.savepoint.path", 
> "file:///tmp/flinkjob/159561b8c97c9e0b4f9eeb649086796a/chk-1"); // Anchor-A:
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
> env.setParallelism(1);
> EnvironmentSettings envSetting = EnvironmentSettings
> .newInstance()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tableEnv =  StreamTableEnvironment.create(env, 
> envSetting);
> String sourceDDL = "CREATE TABLE orders (\n" +
> "  id   INT,\n" +
> "  app  INT,\n" +
> "  user_id  STRING" +
> ") WITH (\n" +
> "   'connector' = 'datagen',\n" +
> "   'rows-per-second'='1',\n" +
> "   'fields.app.min'='1',\n" +
> "   'fields.app.max'='10',\n" +
> "   'fields.user_id.length'='10'\n" +
> ")";
> tableEnv.executeSql(sourceDDL);
> Table query = tableEnv.sqlQuery("select * from orders");
> DataStream rowDataStream = tableEnv.toAppendStream(query, 
> Row.class);
> TypeInformation[] returnTypes = new TypeInformation[4];
> returnTypes[0] = Types.INT;
> returnTypes[1] = Types.INT; // Anchor-B:
> returnTypes[2] = Types.INT;
> returnTypes[3] = Types.INT;
> rowDataStream.keyBy(new KeySelector() {
> @Override
> public String getKey(Row value) throws Exception {
> return value.getFieldAs(2);
> }
> }).process(new KeyedProcessFunction() {
> private Row firstRow;
> @Override
> public void processElement(Row value, Context ctx, 
> Collector out) throws Exception {
> if (firstRow == null) {
> firstRow = value;
> }
> 
> ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 
> 3000);
> }
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx, 
> Collector out) throws Exception {
> Row colRow = new Row(4);
> colRow.setField(0, 0);
> colRow.setField(1, 1);
> colRow.setField(2, 2);
> colRow.setField(3, 3);
> out.collect(colRow); // Anchor-C
> }
> }).name("TargetTestUDF")
> .returns(new RowTypeInfo(returnTypes))
> .print();
> env.execute(OnTimerDemo.class.getSimpleName());
> }
> }
>  {code}
>  * Recurrence steps
>  ** Run the job without state.
>  ** Collect the latest available checkpoint path as 'checkpoint-path-a'
>  ** Stop the job.
>  ** Fill the real value of 'checkpoint-path-a' into 'Anchor-A' line and 
> un-comment the line.
>  ** Set 'returnTypes[1] = Types.INT;' -> 'returnTypes[1] = Types.LONG;' at 
> the 'Anchor-B' line.
>  ** Then add break-point at 'StreamTask#handleAsyncException' method.
>  ** Run the job. The 'java.lang.ClassCastException: java.lang.Integer cannot 
> be cast to java.lang.Long' exception caused at the 'Anchor-C' line will 
> ignore at  

[jira] [Commented] (FLINK-26088) Add Elasticsearch 8.0 support

2023-01-03 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653975#comment-17653975
 ] 

Yingjie Cao commented on FLINK-26088:
-

[~iture123] Any progress on this issue? In which version do you plan to 
implement this feature? We are also looking forward to this feature?

> Add Elasticsearch 8.0 support
> -
>
> Key: FLINK-26088
> URL: https://issues.apache.org/jira/browse/FLINK-26088
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Reporter: Yuhao Bi
>Assignee: zhenzhenhua
>Priority: Major
>
> Since Elasticsearch 8.0 is officially released, I think it's time to consider 
> adding es8 connector support.
> The High Level REST Client we used for connection [is marked deprecated in es 
> 7.15.0|https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high.html].
>  Maybe we can migrate to use the new [Java API 
> Client|https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/8.0/index.html]
>  at this time.
> Elasticsearch8.0 release note: 
> [https://www.elastic.co/guide/en/elasticsearch/reference/8.0/release-notes-8.0.0.html]
> release highlights: 
> [https://www.elastic.co/guide/en/elasticsearch/reference/8.0/release-highlights.html]
> REST API compatibility: 
> https://www.elastic.co/guide/en/elasticsearch/reference/8.0/rest-api-compatibility.html



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29845) ThroughputCalculator throws java.lang.IllegalArgumentException: Time should be non negative under very low throughput cluster

2022-12-22 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17651157#comment-17651157
 ] 

Yingjie Cao commented on FLINK-29845:
-

[~dawnwords] Would you like to fix it? If not, we will assign this issue to 
other contributors.

> ThroughputCalculator throws java.lang.IllegalArgumentException: Time should 
> be non negative under very low throughput cluster
> -
>
> Key: FLINK-29845
> URL: https://issues.apache.org/jira/browse/FLINK-29845
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Runtime / Task
>Affects Versions: 1.14.6
>Reporter: Jingxiao GU
>Priority: Major
>
> Our team are using Flink@1.14.6 to process data from Kafka.
> It works all fine unless the same job jar with same arguments deployed in an 
> environment with{color:#ff} *very low kafka source throughput.*{color} 
> The job crashed sometimes with the following Exception and could not be able 
> to recover unless we restarted TaskManagers, which is unacceptable for a 
> production environment.
> {code:java}
> [2022-10-31T15:33:57.153+08:00] [o.a.f.runtime.taskmanager.Task#cess 
> (2/16)#244] - [WARN ] KeyedProcess (2/16)#244 
> (b9b54f6445419fc43c4d58fcd95cee82) switched from RUNNING to FAILED with 
> failure cause: java.lang.IllegalArgumentException: Time should be non negative
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>   at 
> org.apache.flink.runtime.throughput.ThroughputEMA.calculateThroughput(ThroughputEMA.java:44)
>   at 
> org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:80)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.debloat(StreamTask.java:789)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$4(StreamTask.java:781)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:806)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:758)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
> After checking the source code roughly, we found if buffer debloating is 
> disabled 
> ([https://github.com/apache/flink/blob/release-1.14.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L427]
>  ), the buffer debloater will still be scheduled 
> ([https://github.com/apache/flink/blob/release-1.14.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L755]
>  ) so that the {{ThrouputCalculator}}  keeps calculating the throughput 
> ([https://github.com/apache/flink/blob/release-1.14.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L789]
>  ) which causes the division of zero and seems useless as i suppose.
> Currently, we tried to workaround by setting 
> {{taskmanager.network.memory.buffer-debloat.period: 365d}} to avoid the 
> buffer debloater being scheduled frequently causing the random crash.
> P.S. We found a bug with similar stacktrace 
> https://issues.apache.org/jira/browse/FLINK-25454 which was fixed in 1.14.6.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30208) avoid unconditional state update in CountTrigger#onElement

2022-12-22 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17651155#comment-17651155
 ] 

Yingjie Cao commented on FLINK-30208:
-

[~xljtswf] Thanks for your explanation. I think there is no compatibility 
issue. But I am not sure about the performance gain. Do you have any test 
numbers?

> avoid unconditional state update in CountTrigger#onElement
> --
>
> Key: FLINK-30208
> URL: https://issues.apache.org/jira/browse/FLINK-30208
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: xljtswf
>Priority: Major
>
> In current CountTrigger#onElement, when one element is received, the state is 
> updated unconditionally, and we then fetch the state again to check whether 
> we need to clear the state. This implies we may update the state 2 times to 
> process one element. I suppose to make following simplification:
> public TriggerResult onElement(Object element, long timestamp, W window, 
> TriggerContext ctx)
> throws Exception {
> TriggerResult triggerResult;
> if (maxCount > 1) {
> ReducingState countState = 
> ctx.getPartitionedState(stateDesc);
> Long currentCount = countState.get();
> if (currentCount == null || currentCount < maxCount - 1) {
> countState.add(1L);
> triggerResult = TriggerResult.CONTINUE;
> } else {
> countState.clear();
> triggerResult = TriggerResult.FIRE;
> }
> } else {
> triggerResult = TriggerResult.FIRE;
> }
> return triggerResult;
> }
> If this is approved, I will make a pr then.
> Thanks!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30208) avoid unconditional state update in CountTrigger#onElement

2022-12-08 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17644663#comment-17644663
 ] 

Yingjie Cao commented on FLINK-30208:
-

After the change, for most cases, the state will be still accessed twice (get 
and update), am I understanding right?

> avoid unconditional state update in CountTrigger#onElement
> --
>
> Key: FLINK-30208
> URL: https://issues.apache.org/jira/browse/FLINK-30208
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: xljtswf
>Priority: Major
>
> In current CountTrigger#onElement, when one element is received, the state is 
> updated unconditionally, and we then fetch the state again to check whether 
> we need to clear the state. This implies we may update the state 2 times to 
> process one element. I suppose to make following simplification:
> public TriggerResult onElement(Object element, long timestamp, W window, 
> TriggerContext ctx)
> throws Exception {
> TriggerResult triggerResult;
> if (maxCount > 1) {
> ReducingState countState = 
> ctx.getPartitionedState(stateDesc);
> Long currentCount = countState.get();
> if (currentCount == null || currentCount < maxCount - 1) {
> countState.add(1L);
> triggerResult = TriggerResult.CONTINUE;
> } else {
> countState.clear();
> triggerResult = TriggerResult.FIRE;
> }
> } else {
> triggerResult = TriggerResult.FIRE;
> }
> return triggerResult;
> }
> If this is approved, I will make a pr then.
> Thanks!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30217) Use ListState#update() to replace clear + add mode.

2022-12-08 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-30217:

Component/s: Runtime / State Backends
 (was: API / DataStream)

> Use ListState#update() to replace clear + add mode.
> ---
>
> Key: FLINK-30217
> URL: https://issues.apache.org/jira/browse/FLINK-30217
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: xljtswf
>Priority: Major
>
> When using listState, I found many times we need to clear current state, then 
> add new values. This is especially common in 
> CheckpointedFunction#snapshotState, which is slower than just use 
> ListState#update().
> Suppose we want to update the liststate to contain value1, value2, value3.
> With current implementation, we first call Liststate#clear(). this updates 
> the state 1 time.
> then we add value1, value2, value3 to the state.
> if we use heap state, we need to search the stateTable 3 times and add 3 
> values to the list.
> this happens in memory and is not too bad.
> if we use rocksdb. then we will call backend.db.merge() 3 times.
> finally, we will  update the state 4 times.
> The more values to be added, the more times we will update the state.
> while if we use listState#update. then we just need to update the state 1 
> time. I think this can save a lot of time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29884) Flaky test failure in finegrained_resource_management/SortMergeResultPartitionTest.testRelease

2022-11-17 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635272#comment-17635272
 ] 

Yingjie Cao commented on FLINK-29884:
-

I will try to reproduce it and will update if any progress.

> Flaky test failure in 
> finegrained_resource_management/SortMergeResultPartitionTest.testRelease
> --
>
> Key: FLINK-29884
> URL: https://issues.apache.org/jira/browse/FLINK-29884
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Runtime / Network, Tests
>Affects Versions: 1.17.0
>Reporter: Nico Kruber
>Priority: Major
>  Labels: test-stability
> Fix For: 1.17.0
>
>
> {{SortMergeResultPartitionTest.testRelease}} failed with a timeout in the 
> finegrained_resource_management tests:
> {code:java}
> Nov 03 17:28:07 [ERROR] Tests run: 20, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 64.649 s <<< FAILURE! - in 
> org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionTest
> Nov 03 17:28:07 [ERROR] SortMergeResultPartitionTest.testRelease  Time 
> elapsed: 60.009 s  <<< ERROR!
> Nov 03 17:28:07 org.junit.runners.model.TestTimedOutException: test timed out 
> after 60 seconds
> Nov 03 17:28:07   at 
> org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionTest.testRelease(SortMergeResultPartitionTest.java:374)
> Nov 03 17:28:07   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Nov 03 17:28:07   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Nov 03 17:28:07   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Nov 03 17:28:07   at java.lang.reflect.Method.invoke(Method.java:498)
> Nov 03 17:28:07   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Nov 03 17:28:07   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> Nov 03 17:28:07   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> Nov 03 17:28:07   at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> Nov 03 17:28:07   at java.lang.Thread.run(Thread.java:748) {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42806=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29845) ThroughputCalculator throws java.lang.IllegalArgumentException: Time should be non negative under very low throughput cluster

2022-11-17 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635261#comment-17635261
 ] 

Yingjie Cao commented on FLINK-29845:
-

I think it is not guaranteed that System.currentTimeMillis is monotonic. It is 
better to give it a fix. Do you already have a fix? [~dawnwords] 

> ThroughputCalculator throws java.lang.IllegalArgumentException: Time should 
> be non negative under very low throughput cluster
> -
>
> Key: FLINK-29845
> URL: https://issues.apache.org/jira/browse/FLINK-29845
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Runtime / Task
>Affects Versions: 1.14.6
>Reporter: Jingxiao GU
>Priority: Major
>
> Our team are using Flink@1.14.6 to process data from Kafka.
> It works all fine unless the same job jar with same arguments deployed in an 
> environment with{color:#ff} *very low kafka source throughput.*{color} 
> The job crashed sometimes with the following Exception and could not be able 
> to recover unless we restarted TaskManagers, which is unacceptable for a 
> production environment.
> {code:java}
> [2022-10-31T15:33:57.153+08:00] [o.a.f.runtime.taskmanager.Task#cess 
> (2/16)#244] - [WARN ] KeyedProcess (2/16)#244 
> (b9b54f6445419fc43c4d58fcd95cee82) switched from RUNNING to FAILED with 
> failure cause: java.lang.IllegalArgumentException: Time should be non negative
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>   at 
> org.apache.flink.runtime.throughput.ThroughputEMA.calculateThroughput(ThroughputEMA.java:44)
>   at 
> org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:80)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.debloat(StreamTask.java:789)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$4(StreamTask.java:781)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:806)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:758)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
> After checking the source code roughly, we found if buffer debloating is 
> disabled 
> ([https://github.com/apache/flink/blob/release-1.14.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L427]
>  ), the buffer debloater will still be scheduled 
> ([https://github.com/apache/flink/blob/release-1.14.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L755]
>  ) so that the {{ThrouputCalculator}}  keeps calculating the throughput 
> ([https://github.com/apache/flink/blob/release-1.14.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L789]
>  ) which causes the division of zero and seems useless as i suppose.
> Currently, we tried to workaround by setting 
> {{taskmanager.network.memory.buffer-debloat.period: 365d}} to avoid the 
> buffer debloater being scheduled frequently causing the random crash.
> P.S. We found a bug with similar stacktrace 
> https://issues.apache.org/jira/browse/FLINK-25454 which was fixed in 1.14.6.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception

2022-11-17 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635247#comment-17635247
 ] 

Yingjie Cao commented on FLINK-29816:
-

Any update on this issue?

> Userfunction exception in ProcessWindowFunction was called before invoke 
> during restore state(subtask was in INITIALIZING state), but SteamTask skip 
> handle Exception
> -
>
> Key: FLINK-29816
> URL: https://issues.apache.org/jira/browse/FLINK-29816
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.14.0, 1.16.0, 1.15.3
>Reporter: Xie Yi
>Priority: Major
> Attachments: image-2022-10-31-19-49-52-432.png, 
> image-2022-10-31-19-54-12-546.png, image-2022-11-02-10-42-21-099.png, 
> image-2022-11-02-10-57-08-064.png, image-2022-11-02-11-06-37-925.png, 
> image-2022-11-02-11-10-25-508.png
>
>
> h4. 1. How to repeat 
> ProcessWindowFunction, and make some exception in process()
> test code
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.enableCheckpointing(60 * 1000);
> 
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(6);
> KafkaSource kafkaConsumer = KafkaSource.builder()
> .setBootstrapServers("")
> .setTopics("")
> .setGroupId("")
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .setStartingOffsets(OffsetsInitializer.earliest())
> .build();
> DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, 
> WatermarkStrategy.noWatermarks(), "Kafka Source");
> SingleOutputStreamOperator mapSourse = kafkaSource.keyBy(s -> 
> s).window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
> .process(new ProcessWindowFunction TimeWindow>() {
> @Override
> public void process(String s, 
> ProcessWindowFunction.Context context, 
> Iterable iterable, Collector collector) throws Exception {
> //when process event:"abc" .It causes 
> java.lang.NumberFormatException
> Integer intS = Integer.valueOf(s);
> collector.collect(s);
> }
> })
> .name("name-process").uid("uid-process");
> mapSourse.print();
> env.execute();
> }
> {code}
> kafka input event
> {code:java}
> >1
> >1
> >2
> >2
> >3
> >3
> >abc
> >abc
> >
> {code}
> h4. 2. fault phenomena
> when job process the event:"abc",It will cause 
> java.lang.NumberFormatException and failover ,Then attempt and failover 
> continuously.
> However, it only failover 2 times(attempt 0, attempt 1) and when attempt for 
> third time, It work normally, and no exception
> !image-2022-10-31-19-54-12-546.png!
> checkpoint 1  complete in attempt 1,before failover exception 1
> {code:java}
> 2022-10-31 16:59:53,644 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering 
> checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job 
> 7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
> checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes, 
> checkpointDuration=333 ms, finalizationTime=72 ms).  {code}
>  
> attempt 2 was restore from checkpoint
> {code:java}
> 2022-10-31 17:00:30,033 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for 
> 7bca78a75b089d447bb4c99efcfd6527 located at 
> hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1.
> {code}
>  
>  
> h4. 3. possible reasons
> during attempt 2 , task restore from checkpoint, userfunction in 
> ProcessWindowFunction was called in SteamTask.restore and produce 
> "java.lang.NumberFormatException", However, SteamTask catch exception and 
> didn't handle exception because subtask is not in RUNNING state.
> *the stack trace in attempt 2*
> user function was called in SteamTask.restore(subtask state is INITIALIZING)
> {code:java}
> java.lang.Thread.getStackTrace(Thread.java:1552)
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40)
> 

[jira] [Commented] (FLINK-29809) REST API for running a Flink job

2022-11-17 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635244#comment-17635244
 ] 

Yingjie Cao commented on FLINK-29809:
-

Do you mean to pass an URL to Flink through the REST API and Flink APP itself 
gets the jar from the URL and runs it?

> REST API for running a Flink job
> 
>
> Key: FLINK-29809
> URL: https://issues.apache.org/jira/browse/FLINK-29809
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: Nguyễn Minh Phú
>Priority: Major
>
> When I want to submit a Flink job, I have to run `flink run ...` or submit a 
> jar via Flink web. But in our production environment, we cannot connect to 
> the flink server and run the command or submit a jar file via the web. So I 
> need a REST API to trigger a jar file in Flink server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28561) Merge subpartition shuffle data read request for better sequential IO

2022-11-03 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-28561:

Priority: Minor  (was: Major)

> Merge subpartition shuffle data read request for better sequential IO
> -
>
> Key: FLINK-28561
> URL: https://issues.apache.org/jira/browse/FLINK-28561
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Minor
>
> Currently, the shuffle data of each subpartition for blocking shuffle is read 
> separately. To achieve better performance and reduce IOPS, we can merge 
> consecutive data requests of the same field together and serves them in one 
> IO request. More specifically,
> 1) if multiple data requests are reading the same data, for example, reading 
> broadcast data, the reader will read the data only once and send the same 
> piece of data to multiple downstream consumers.
> 2) if multiple data requests are reading the consecutive data in one file, we 
> will merge those data requests together as one large request and read a 
> larger size of data sequentially which is good for file IO performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28561) Merge subpartition shuffle data read request for better sequential IO

2022-11-03 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-28561:

Fix Version/s: (was: 1.17.0)

> Merge subpartition shuffle data read request for better sequential IO
> -
>
> Key: FLINK-28561
> URL: https://issues.apache.org/jira/browse/FLINK-28561
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Major
>
> Currently, the shuffle data of each subpartition for blocking shuffle is read 
> separately. To achieve better performance and reduce IOPS, we can merge 
> consecutive data requests of the same field together and serves them in one 
> IO request. More specifically,
> 1) if multiple data requests are reading the same data, for example, reading 
> broadcast data, the reader will read the data only once and send the same 
> piece of data to multiple downstream consumers.
> 2) if multiple data requests are reading the consecutive data in one file, we 
> will merge those data requests together as one large request and read a 
> larger size of data sequentially which is good for file IO performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29573) Flink does not work for windows if installing on other drive except for C drive

2022-11-03 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17628157#comment-17628157
 ] 

Yingjie Cao commented on FLINK-29573:
-

[~kriswang] Thanks for reporting this issue, do you know if this issue also 
exists in newer flink version?

> Flink does not work for windows if installing on other drive except for C 
> drive
> ---
>
> Key: FLINK-29573
> URL: https://issues.apache.org/jira/browse/FLINK-29573
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration
>Affects Versions: 1.14.4
>Reporter: kris wang
>Priority: Major
>
> {{ flink_service.bat }}{-}{{-}}> \{{flink_inner.bat }}{-}{{-}}> 
> \{{start-cluster.sh }}{{-}}{-}>\{{{}config.sh}}
> The {{config.sh}} which is under the _*D:\x\flink-1.14.4\bin*_ (Flink’s 
> directory)
> has _**_ {{runBashJavaUtilsCmd()}} --> the current derived value of 
> {{class_path}} after mangling the Path is
> *D;C:\x\flink-1.14.4\bin\bash-java-utils.jar;D:\x\flink-1.14.4\lib\flink-dist_2.12-1.14.4.jar*
> Here, {{runBashJavaUtilsCmd()}} internally sets {{class_path}} by calling a 
> function called {{manglePathList}} 
> {{manglePathList}} is causing this behavior where
> _:\x\flink-1.13.5/bin/bash-java-utils.jar_ is getting converted to 
> _C:\x\flink-1.14.4\bin\bash-java-utils.jar_ 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29008) SortMergeResultPartitionTest.testRelease failed with TestTimedOutException

2022-10-27 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao closed FLINK-29008.
---
Resolution: Cannot Reproduce

> SortMergeResultPartitionTest.testRelease failed with TestTimedOutException
> --
>
> Key: FLINK-29008
> URL: https://issues.apache.org/jira/browse/FLINK-29008
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Priority: Major
>  Labels: test-stability
> Fix For: 1.17.0
>
>
> {code:java}
> 2022-08-17T02:58:01.8887337Z Aug 17 02:58:01 [ERROR] 
> SortMergeResultPartitionTest.testRelease  Time elapsed: 60.843 s  <<< ERROR!
> 2022-08-17T02:58:01.8887851Z Aug 17 02:58:01 
> org.junit.runners.model.TestTimedOutException: test timed out after 60 seconds
> 2022-08-17T02:58:01.8891703Z Aug 17 02:58:01  at 
> org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionTest.testRelease(SortMergeResultPartitionTest.java:374)
> 2022-08-17T02:58:01.8892705Z Aug 17 02:58:01  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2022-08-17T02:58:01.8893282Z Aug 17 02:58:01  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2022-08-17T02:58:01.8893966Z Aug 17 02:58:01  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2022-08-17T02:58:01.8898552Z Aug 17 02:58:01  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2022-08-17T02:58:01.8899210Z Aug 17 02:58:01  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2022-08-17T02:58:01.8899844Z Aug 17 02:58:01  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2022-08-17T02:58:01.8900581Z Aug 17 02:58:01  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2022-08-17T02:58:01.8901226Z Aug 17 02:58:01  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2022-08-17T02:58:01.8907420Z Aug 17 02:58:01  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2022-08-17T02:58:01.8908068Z Aug 17 02:58:01  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2022-08-17T02:58:01.8908735Z Aug 17 02:58:01  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> 2022-08-17T02:58:01.8909447Z Aug 17 02:58:01  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> 2022-08-17T02:58:01.8910255Z Aug 17 02:58:01  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2022-08-17T02:58:01.8915691Z Aug 17 02:58:01  at 
> java.lang.Thread.run(Thread.java:748)
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=40084=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29008) SortMergeResultPartitionTest.testRelease failed with TestTimedOutException

2022-10-27 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624937#comment-17624937
 ] 

Yingjie Cao commented on FLINK-29008:
-

I ran this test for more than 1000 times locally, the issue didn't reproduce. I 
also checked the code but found nothing. As it hasn't appeared for over two 
months, I am closing it. Feel free to reopen it if it still reproduce.

> SortMergeResultPartitionTest.testRelease failed with TestTimedOutException
> --
>
> Key: FLINK-29008
> URL: https://issues.apache.org/jira/browse/FLINK-29008
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Priority: Major
>  Labels: test-stability
> Fix For: 1.17.0
>
>
> {code:java}
> 2022-08-17T02:58:01.8887337Z Aug 17 02:58:01 [ERROR] 
> SortMergeResultPartitionTest.testRelease  Time elapsed: 60.843 s  <<< ERROR!
> 2022-08-17T02:58:01.8887851Z Aug 17 02:58:01 
> org.junit.runners.model.TestTimedOutException: test timed out after 60 seconds
> 2022-08-17T02:58:01.8891703Z Aug 17 02:58:01  at 
> org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionTest.testRelease(SortMergeResultPartitionTest.java:374)
> 2022-08-17T02:58:01.8892705Z Aug 17 02:58:01  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2022-08-17T02:58:01.8893282Z Aug 17 02:58:01  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2022-08-17T02:58:01.8893966Z Aug 17 02:58:01  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2022-08-17T02:58:01.8898552Z Aug 17 02:58:01  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2022-08-17T02:58:01.8899210Z Aug 17 02:58:01  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2022-08-17T02:58:01.8899844Z Aug 17 02:58:01  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2022-08-17T02:58:01.8900581Z Aug 17 02:58:01  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2022-08-17T02:58:01.8901226Z Aug 17 02:58:01  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2022-08-17T02:58:01.8907420Z Aug 17 02:58:01  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2022-08-17T02:58:01.8908068Z Aug 17 02:58:01  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2022-08-17T02:58:01.8908735Z Aug 17 02:58:01  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> 2022-08-17T02:58:01.8909447Z Aug 17 02:58:01  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> 2022-08-17T02:58:01.8910255Z Aug 17 02:58:01  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2022-08-17T02:58:01.8915691Z Aug 17 02:58:01  at 
> java.lang.Thread.run(Thread.java:748)
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=40084=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29296) OperatorCoordinatorHolder.create throws NPE

2022-10-20 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao closed FLINK-29296.
---
Resolution: Fixed

> OperatorCoordinatorHolder.create throws NPE
> ---
>
> Key: FLINK-29296
> URL: https://issues.apache.org/jira/browse/FLINK-29296
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.0
>Reporter: Xingbo Huang
>Priority: Critical
>
> {code:java}
> 2022-09-13T15:22:42.3864318Z Sep 13 15:22:42 [ERROR] Tests run: 8, Failures: 
> 0, Errors: 1, Skipped: 0, Time elapsed: 5.633 s <<< FAILURE! - in 
> org.apache.flink.test.streaming.runtime.SourceNAryInputChainingITCase
> 2022-09-13T15:22:42.3865377Z Sep 13 15:22:42 [ERROR] 
> org.apache.flink.test.streaming.runtime.SourceNAryInputChainingITCase.testDirectSourcesOnlyExecution
>   Time elapsed: 0.165 s  <<< ERROR!
> 2022-09-13T15:22:42.3867571Z Sep 13 15:22:42 java.lang.RuntimeException: 
> Failed to fetch next result
> 2022-09-13T15:22:42.3919112Z Sep 13 15:22:42  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
> 2022-09-13T15:22:42.3920935Z Sep 13 15:22:42  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> 2022-09-13T15:22:42.3922442Z Sep 13 15:22:42  at 
> org.apache.flink.streaming.api.datastream.DataStreamUtils.collectBoundedStream(DataStreamUtils.java:106)
> 2022-09-13T15:22:42.3924085Z Sep 13 15:22:42  at 
> org.apache.flink.test.streaming.runtime.SourceNAryInputChainingITCase.testDirectSourcesOnlyExecution(SourceNAryInputChainingITCase.java:89)
> 2022-09-13T15:22:42.3925493Z Sep 13 15:22:42  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2022-09-13T15:22:42.3926635Z Sep 13 15:22:42  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2022-09-13T15:22:42.3928378Z Sep 13 15:22:42  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2022-09-13T15:22:42.3964273Z Sep 13 15:22:42  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2022-09-13T15:22:42.3965054Z Sep 13 15:22:42  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2022-09-13T15:22:42.3965788Z Sep 13 15:22:42  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2022-09-13T15:22:42.3966508Z Sep 13 15:22:42  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2022-09-13T15:22:42.3967476Z Sep 13 15:22:42  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2022-09-13T15:22:42.3968432Z Sep 13 15:22:42  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2022-09-13T15:22:42.3969233Z Sep 13 15:22:42  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2022-09-13T15:22:42.3969871Z Sep 13 15:22:42  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2022-09-13T15:22:42.3970534Z Sep 13 15:22:42  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 2022-09-13T15:22:42.3971453Z Sep 13 15:22:42  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> 2022-09-13T15:22:42.3972453Z Sep 13 15:22:42  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 2022-09-13T15:22:42.3973193Z Sep 13 15:22:42  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 2022-09-13T15:22:42.3973857Z Sep 13 15:22:42  at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 2022-09-13T15:22:42.3974634Z Sep 13 15:22:42  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 2022-09-13T15:22:42.3975420Z Sep 13 15:22:42  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> 2022-09-13T15:22:42.3976060Z Sep 13 15:22:42  at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> 2022-09-13T15:22:42.3976689Z Sep 13 15:22:42  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> 2022-09-13T15:22:42.3977555Z Sep 13 15:22:42  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2022-09-13T15:22:42.3978248Z Sep 13 15:22:42  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2022-09-13T15:22:42.3978856Z Sep 13 15:22:42  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2022-09-13T15:22:42.3979696Z Sep 13 15:22:42  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2022-09-13T15:22:42.3980716Z Sep 13 15:22:42  at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> 2022-09-13T15:22:42.3981785Z Sep 13 

[jira] [Commented] (FLINK-29296) OperatorCoordinatorHolder.create throws NPE

2022-10-20 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17620874#comment-17620874
 ] 

Yingjie Cao commented on FLINK-29296:
-

I think it is already fixed by 
https://issues.apache.org/jira/browse/FLINK-29576. I am closing it. Feel free 
to reopen it if it still reproduces.

> OperatorCoordinatorHolder.create throws NPE
> ---
>
> Key: FLINK-29296
> URL: https://issues.apache.org/jira/browse/FLINK-29296
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.0
>Reporter: Xingbo Huang
>Priority: Critical
>
> {code:java}
> 2022-09-13T15:22:42.3864318Z Sep 13 15:22:42 [ERROR] Tests run: 8, Failures: 
> 0, Errors: 1, Skipped: 0, Time elapsed: 5.633 s <<< FAILURE! - in 
> org.apache.flink.test.streaming.runtime.SourceNAryInputChainingITCase
> 2022-09-13T15:22:42.3865377Z Sep 13 15:22:42 [ERROR] 
> org.apache.flink.test.streaming.runtime.SourceNAryInputChainingITCase.testDirectSourcesOnlyExecution
>   Time elapsed: 0.165 s  <<< ERROR!
> 2022-09-13T15:22:42.3867571Z Sep 13 15:22:42 java.lang.RuntimeException: 
> Failed to fetch next result
> 2022-09-13T15:22:42.3919112Z Sep 13 15:22:42  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
> 2022-09-13T15:22:42.3920935Z Sep 13 15:22:42  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> 2022-09-13T15:22:42.3922442Z Sep 13 15:22:42  at 
> org.apache.flink.streaming.api.datastream.DataStreamUtils.collectBoundedStream(DataStreamUtils.java:106)
> 2022-09-13T15:22:42.3924085Z Sep 13 15:22:42  at 
> org.apache.flink.test.streaming.runtime.SourceNAryInputChainingITCase.testDirectSourcesOnlyExecution(SourceNAryInputChainingITCase.java:89)
> 2022-09-13T15:22:42.3925493Z Sep 13 15:22:42  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2022-09-13T15:22:42.3926635Z Sep 13 15:22:42  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2022-09-13T15:22:42.3928378Z Sep 13 15:22:42  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2022-09-13T15:22:42.3964273Z Sep 13 15:22:42  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2022-09-13T15:22:42.3965054Z Sep 13 15:22:42  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2022-09-13T15:22:42.3965788Z Sep 13 15:22:42  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2022-09-13T15:22:42.3966508Z Sep 13 15:22:42  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2022-09-13T15:22:42.3967476Z Sep 13 15:22:42  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2022-09-13T15:22:42.3968432Z Sep 13 15:22:42  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2022-09-13T15:22:42.3969233Z Sep 13 15:22:42  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2022-09-13T15:22:42.3969871Z Sep 13 15:22:42  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2022-09-13T15:22:42.3970534Z Sep 13 15:22:42  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 2022-09-13T15:22:42.3971453Z Sep 13 15:22:42  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> 2022-09-13T15:22:42.3972453Z Sep 13 15:22:42  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 2022-09-13T15:22:42.3973193Z Sep 13 15:22:42  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 2022-09-13T15:22:42.3973857Z Sep 13 15:22:42  at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 2022-09-13T15:22:42.3974634Z Sep 13 15:22:42  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 2022-09-13T15:22:42.3975420Z Sep 13 15:22:42  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> 2022-09-13T15:22:42.3976060Z Sep 13 15:22:42  at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> 2022-09-13T15:22:42.3976689Z Sep 13 15:22:42  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> 2022-09-13T15:22:42.3977555Z Sep 13 15:22:42  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2022-09-13T15:22:42.3978248Z Sep 13 15:22:42  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2022-09-13T15:22:42.3978856Z Sep 13 15:22:42  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2022-09-13T15:22:42.3979696Z Sep 13 15:22:42  at 
> 

[jira] [Commented] (FLINK-29298) LocalBufferPool request buffer from NetworkBufferPool hanging

2022-10-20 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17620848#comment-17620848
 ] 

Yingjie Cao commented on FLINK-29298:
-

[~Weijie Guo] Thanks for reporting this.

> LocalBufferPool request buffer from NetworkBufferPool hanging
> -
>
> Key: FLINK-29298
> URL: https://issues.apache.org/jira/browse/FLINK-29298
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.17.0
>
> Attachments: image-2022-09-14-10-52-15-259.png, 
> image-2022-09-14-10-58-45-987.png, image-2022-09-14-11-00-47-309.png
>
>
> In the scenario where the buffer contention is fierce, sometimes the task 
> hang can be observed. Through the thread dump information, we can found that 
> the task thread is blocked by requestMemorySegmentBlocking forever. After 
> investigating the dumped heap information, I found that the NetworkBufferPool 
> actually has many buffers, but the LocalBufferPool is still unavailable and 
> no buffer has been obtained.
> By looking at the code, I am sure that this is a bug in thread race: when the 
> task thread polled out the last buffer in LocalBufferPool and triggered the 
> onGlobalPoolAvailable callback itself, it will skip this notification  (as 
> currently the LocalBufferPool is available), which will cause the BufferPool 
> to eventually become unavailable and will never register a callback to the 
> NetworkBufferPool.
> The conditions for triggering the problem are relatively strict, but I have 
> found a stable way to reproduce it, I will try to fix and verify this problem.
> !image-2022-09-14-10-52-15-259.png|width=1021,height=219!
> !image-2022-09-14-10-58-45-987.png|width=997,height=315!
> !image-2022-09-14-11-00-47-309.png|width=453,height=121!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29306) fail to check multiple flink-dist*.jar for config.sh

2022-10-20 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao reassigned FLINK-29306:
---

Assignee: LiuZeshan

> fail to check multiple flink-dist*.jar for config.sh
> 
>
> Key: FLINK-29306
> URL: https://issues.apache.org/jira/browse/FLINK-29306
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.16.0
>Reporter: LiuZeshan
>Assignee: LiuZeshan
>Priority: Minor
>  Labels: pull-request-available
>
> The following shell command always make FLINK_DIST_COUNT=1 
> ([config.sh|https://github.com/apache/flink/blob/db98322472cb65ca0358ec1cce7f9ef737198189/flink-dist/src/main/flink-bin/bin/config.sh#L35])
> {code:java}
> FLINK_DIST_COUNT="$(echo "$FLINK_DIST" | wc -l)" {code}
> and the following condition will always be false, so fail to check multiple 
> flink-dist*.jar。
> {code:java}
> [[ "$FLINK_DIST_COUNT" -gt 1 ]] {code}
> examples:
> {code:java}
> # 
> FLINK_DIST=":/Users/lzs/.data/github/flink/flink-dist/target/flink-1.17-SNAPSHOT-bin/flink-1.17-SNAPSHOT/lib/flink-dist-1.17-SNAPSHOT.jar:/Users/lzs/.data/github/flink/flink-dist/target/flink-1.17-SNAPSHOT-bin/flink-1.17-SNAPSHOT/lib/flink-dist-1.17-xxx.jar"
> # echo "$FLINK_DIST" | wc -l
> 1{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29565) In Flink per job mode, the logs printed by taskManager on the web UI will not be highlighted, because the log contents are annotated due to special symbols, which will

2022-10-20 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17620835#comment-17620835
 ] 

Yingjie Cao commented on FLINK-29565:
-

[~1336399775] Thanks for reporting this. Do you have a solution for this?

> In Flink per job mode, the logs printed by taskManager on the web UI will not 
> be highlighted, because the log contents are annotated due to special 
> symbols, which will affect the use experience. For more information, see Fig
> 
>
> Key: FLINK-29565
> URL: https://issues.apache.org/jira/browse/FLINK-29565
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.14.3
>Reporter: wangshiwei
>Priority: Minor
> Attachments: image-2022-10-10-18-40-27-721.png, 
> image-2022-10-10-18-43-31-897.png, image-2022-10-10-18-43-53-713.png, 
> image-2022-10-10-18-45-17-228.png, image-2022-10-10-19-02-29-796.png, 
> image-2022-10-10-19-03-27-670.png
>
>
>  
> !image-2022-10-10-19-03-27-670.png|width=580,height=317!
> !image-2022-10-10-18-43-53-713.png|width=726,height=47!
> This kind of '/*' content will appear in the print log of the 
> logEnvironmentInfo method in the EnvironmentInformation class. The following 
> logs will be commented out without highlighting
> *verification*
> !image-2022-10-10-18-45-17-228.png|width=880,height=161!
> After manually printing '*/' in the business code, the log is normal
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29641) SortMergeResultPartitionReadSchedulerTest.testCreateSubpartitionReader

2022-10-17 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-29641:

Fix Version/s: 1.17.0
   1.16.1

> SortMergeResultPartitionReadSchedulerTest.testCreateSubpartitionReader
> --
>
> Key: FLINK-29641
> URL: https://issues.apache.org/jira/browse/FLINK-29641
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Matthias Pohl
>Assignee: Weijie Guo
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.17.0, 1.16.1
>
>
> [This 
> build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42011=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=8433]
>  failed (not exclusively) due to 
> {{SortMergeResultPartitionReadSchedulerTest.testCreateSubpartitionReader}}. 
> The assert checking that the {{SortedMergeSubpartitionReader}} is in running 
> state fails.
> My suspicion is that the condition in 
> [SortMergeResultPartitionReadScheduler.mayTriggerReading|https://github.com/apache/flink/blob/87d4f70e49255b551d0106117978b1aa0747358c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java#L425-L428]
>  (or something related to that condition) needs to be reconsidered since 
> that's the only time {{isRunning}} is actually set to true.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29641) SortMergeResultPartitionReadSchedulerTest.testCreateSubpartitionReader

2022-10-17 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-29641:

Component/s: Runtime / Network
 Tests
 (was: Runtime / Coordination)

> SortMergeResultPartitionReadSchedulerTest.testCreateSubpartitionReader
> --
>
> Key: FLINK-29641
> URL: https://issues.apache.org/jira/browse/FLINK-29641
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Tests
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Matthias Pohl
>Assignee: Weijie Guo
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.17.0, 1.16.1
>
>
> [This 
> build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42011=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=8433]
>  failed (not exclusively) due to 
> {{SortMergeResultPartitionReadSchedulerTest.testCreateSubpartitionReader}}. 
> The assert checking that the {{SortedMergeSubpartitionReader}} is in running 
> state fails.
> My suspicion is that the condition in 
> [SortMergeResultPartitionReadScheduler.mayTriggerReading|https://github.com/apache/flink/blob/87d4f70e49255b551d0106117978b1aa0747358c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java#L425-L428]
>  (or something related to that condition) needs to be reconsidered since 
> that's the only time {{isRunning}} is actually set to true.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29298) LocalBufferPool request buffer from NetworkBufferPool hanging

2022-10-14 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao reassigned FLINK-29298:
---

Assignee: Weijie Guo

> LocalBufferPool request buffer from NetworkBufferPool hanging
> -
>
> Key: FLINK-29298
> URL: https://issues.apache.org/jira/browse/FLINK-29298
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.17.0
>
> Attachments: image-2022-09-14-10-52-15-259.png, 
> image-2022-09-14-10-58-45-987.png, image-2022-09-14-11-00-47-309.png
>
>
> In the scenario where the buffer contention is fierce, sometimes the task 
> hang can be observed. Through the thread dump information, we can found that 
> the task thread is blocked by requestMemorySegmentBlocking forever. After 
> investigating the dumped heap information, I found that the NetworkBufferPool 
> actually has many buffers, but the LocalBufferPool is still unavailable and 
> no buffer has been obtained.
> By looking at the code, I am sure that this is a bug in thread race: when the 
> task thread polled out the last buffer in LocalBufferPool and triggered the 
> onGlobalPoolAvailable callback itself, it will skip this notification  (as 
> currently the LocalBufferPool is available), which will cause the BufferPool 
> to eventually become unavailable and will never register a callback to the 
> NetworkBufferPool.
> The conditions for triggering the problem are relatively strict, but I have 
> found a stable way to reproduce it, I will try to fix and verify this problem.
> !image-2022-09-14-10-52-15-259.png|width=1021,height=219!
> !image-2022-09-14-10-58-45-987.png|width=997,height=315!
> !image-2022-09-14-11-00-47-309.png|width=453,height=121!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29546) UDF:Failed to compile split code, falling back to original code

2022-10-14 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-29546:

Component/s: Table SQL / Runtime
 (was: API / DataStream)

> UDF:Failed to compile split code, falling back to original code
> ---
>
> Key: FLINK-29546
> URL: https://issues.apache.org/jira/browse/FLINK-29546
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.15.0
> Environment: my pom:
> 
>     org.apache.flink
>     flink-table-api-java-uber
> 
> 
>     org.apache.flink
>     flink-table-runtime
> 
> 
>     org.apache.flink
>     flink-table-planner-loader
> 
> jdk 1.8
>Reporter: Hui Wang
>Priority: Major
>
> 2022-10-08 19:05:23 [GroupWindowAggregate[11] (1/1)#0] WARN  
> org.apache.flink.table.runtime.generated.GeneratedClass -Failed to compile 
> split code, falling back to original code
> org.apache.flink.util.FlinkRuntimeException: 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68)
>     at 
> org.apache.flink.table.runtime.operators.window.AggregateWindowOperator.compileGeneratedCode(AggregateWindowOperator.java:148)
>     at 
> org.apache.flink.table.runtime.operators.window.WindowOperator.open(WindowOperator.java:274)
>     at 
> org.apache.flink.table.runtime.operators.window.AggregateWindowOperator.open(AggregateWindowOperator.java:139)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
>  org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859)
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92)
>     ... 15 common frames omitted
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107)
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
>     ... 18 common frames omitted
> Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 
> 28: Cannot determine simple type name "org"
>     at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211)
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6833)
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6594)
>     at 
> 

[jira] [Updated] (FLINK-29565) In Flink per job mode, the logs printed by taskManager on the web UI will not be highlighted, because the log contents are annotated due to special symbols, which will a

2022-10-14 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-29565:

Component/s: Runtime / Web Frontend
 (was: API / Core)

> In Flink per job mode, the logs printed by taskManager on the web UI will not 
> be highlighted, because the log contents are annotated due to special 
> symbols, which will affect the use experience. For more information, see Fig
> 
>
> Key: FLINK-29565
> URL: https://issues.apache.org/jira/browse/FLINK-29565
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.14.3
>Reporter: wangshiwei
>Priority: Minor
> Attachments: image-2022-10-10-18-40-27-721.png, 
> image-2022-10-10-18-43-31-897.png, image-2022-10-10-18-43-53-713.png, 
> image-2022-10-10-18-45-17-228.png, image-2022-10-10-19-02-29-796.png, 
> image-2022-10-10-19-03-27-670.png
>
>
>  
> !image-2022-10-10-19-03-27-670.png|width=580,height=317!
> !image-2022-10-10-18-43-53-713.png|width=726,height=47!
> This kind of '/*' content will appear in the print log of the 
> logEnvironmentInfo method in the EnvironmentInformation class. The following 
> logs will be commented out without highlighting
> *verification*
> !image-2022-10-10-18-45-17-228.png|width=880,height=161!
> After manually printing '*/' in the business code, the log is normal
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29351) Enable input buffer floating for blocking shuffle

2022-09-20 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-29351:
---

 Summary: Enable input buffer floating for blocking shuffle
 Key: FLINK-29351
 URL: https://issues.apache.org/jira/browse/FLINK-29351
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.17.0


At input gate, Flink needs exclusive buffers for each input channel. For large 
parallelism jobs, it is easy to cause "Insufficient number of network buffers" 
error. This ticket aims to make all input network buffers floating for blocking 
shuffle to reduce the possibility of "Insufficient number of network buffers" 
error. This change can also improve the default blocking shuffle performance 
because buffer floating can increase the buffer utilization.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-29299) Fix the network memory size calculation issue in fine-grained resource mode

2022-09-20 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao resolved FLINK-29299.
-
Resolution: Fixed

Fix via 

d3513d98953b0922e3dc753ef90806ed4e264926 on master

7367e358ccf34fb1e9ea2cea9d5b1f630b00e10c on 1.16

> Fix the network memory size calculation issue in fine-grained resource mode
> ---
>
> Key: FLINK-29299
> URL: https://issues.apache.org/jira/browse/FLINK-29299
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> After FLINK-28663, one intermediate dataset can be consumed by multiple 
> consumers, there is a case where one vertex can consume one intermediate 
> dataset multiple times. However, currently in fine-grained resource mode, 
> when computing the required network buffer size, the intermediate dataset is 
> used as key to record the size of network buffer per input gate, which means 
> it may allocate less network buffers than needed if two input gate of the 
> same vertex consumes the same intermediate dataset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29299) Fix the network memory size calculation issue in fine-grained resource mode

2022-09-14 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-29299:

Issue Type: Bug  (was: Improvement)

> Fix the network memory size calculation issue in fine-grained resource mode
> ---
>
> Key: FLINK-29299
> URL: https://issues.apache.org/jira/browse/FLINK-29299
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> After FLINK-28663, one intermediate dataset can be consumed by multiple 
> consumers, there is a case where one vertex can consume one intermediate 
> dataset multiple times. However, currently in fine-grained resource mode, 
> when computing the required network buffer size, the intermediate dataset is 
> used as key to record the size of network buffer per input gate, which means 
> it may allocate less network buffers than needed if two input gate of the 
> same vertex consumes the same intermediate dataset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29299) Fix the network memory size calculation issue in fine-grained resource mode

2022-09-14 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao reassigned FLINK-29299:
---

Assignee: Yingjie Cao

> Fix the network memory size calculation issue in fine-grained resource mode
> ---
>
> Key: FLINK-29299
> URL: https://issues.apache.org/jira/browse/FLINK-29299
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Critical
> Fix For: 1.16.0
>
>
> After FLINK-28663, one intermediate dataset can be consumed by multiple 
> consumers, there is a case where one vertex can consume one intermediate 
> dataset multiple times. However, currently in fine-grained resource mode, 
> when computing the required network buffer size, the intermediate dataset is 
> used as key to record the size of network buffer per input gate, which means 
> it may allocate less network buffers than needed if two input gate of the 
> same vertex consumes the same intermediate dataset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29299) Fix the network memory size calculation issue in fine-grained resource mode

2022-09-13 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-29299:
---

 Summary: Fix the network memory size calculation issue in 
fine-grained resource mode
 Key: FLINK-29299
 URL: https://issues.apache.org/jira/browse/FLINK-29299
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Affects Versions: 1.16.0
Reporter: Yingjie Cao
 Fix For: 1.16.0


After FLINK-28663, one intermediate dataset can be consumed by multiple 
consumers, there is a case where one vertex can consume one intermediate 
dataset multiple times. However, currently in fine-grained resource mode, when 
computing the required network buffer size, the intermediate dataset is used as 
key to record the size of network buffer per input gate, which means it may 
allocate less network buffers than needed if two input gate of the same vertex 
consumes the same intermediate dataset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-28942) Deadlock may occurs when releasing readers for SortMergeResultPartition

2022-08-12 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao resolved FLINK-28942.
-
Resolution: Fixed

Merged into master via f2fb6b20ec493a3af3f19a6f69f25e26ed226dda

> Deadlock may occurs when releasing readers for SortMergeResultPartition
> ---
>
> Key: FLINK-28942
> URL: https://issues.apache.org/jira/browse/FLINK-28942
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> After adding the logic of recycling buffers in CompositeBuffer in 
> https://issues.apache.org/jira/browse/FLINK-28373, when reading data and 
> recycling buffers simultaneously, the deadlock between the lock of 
> SortMergeResultPartition and the lock of SingleInputGate may occur.
> In short, the deadlock may occur as follows.
> 1. SingleInputGate.getNextBufferOrEvent (SingleInputGate lock)
> CompositeBuffer.getFullBufferData -> CompositeBuffer.recycleBuffer -> waiting 
> for 
> SortMergeResultPartition lock;
> 2. ResultPartitionManager.releasePartition (SortMergeResultPartition lock) -> 
> SortMergeSubpartitionReader.notifyDataAvailable -> 
> SingleInputGate.notifyChannelNonEmpty -> waiting for SingleInputGate lock.
> The possibility of this deadlock is very small, but we should fix the bug as 
> soon as possible.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28942) Deadlock may occurs when releasing readers for SortMergeResultPartition

2022-08-12 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao reassigned FLINK-28942:
---

Assignee: Yuxin Tan

> Deadlock may occurs when releasing readers for SortMergeResultPartition
> ---
>
> Key: FLINK-28942
> URL: https://issues.apache.org/jira/browse/FLINK-28942
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> After adding the logic of recycling buffers in CompositeBuffer in 
> https://issues.apache.org/jira/browse/FLINK-28373, when reading data and 
> recycling buffers simultaneously, the deadlock between the lock of 
> SortMergeResultPartition and the lock of SingleInputGate may occur.
> In short, the deadlock may occur as follows.
> 1. SingleInputGate.getNextBufferOrEvent (SingleInputGate lock)
> CompositeBuffer.getFullBufferData -> CompositeBuffer.recycleBuffer -> waiting 
> for 
> SortMergeResultPartition lock;
> 2. ResultPartitionManager.releasePartition (SortMergeResultPartition lock) -> 
> SortMergeSubpartitionReader.notifyDataAvailable -> 
> SingleInputGate.notifyChannelNonEmpty -> waiting for SingleInputGate lock.
> The possibility of this deadlock is very small, but we should fix the bug as 
> soon as possible.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28942) Deadlock may occurs when releasing readers for SortMergeResultPartition

2022-08-12 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-28942:

Fix Version/s: 1.16.0

> Deadlock may occurs when releasing readers for SortMergeResultPartition
> ---
>
> Key: FLINK-28942
> URL: https://issues.apache.org/jira/browse/FLINK-28942
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> After adding the logic of recycling buffers in CompositeBuffer in 
> https://issues.apache.org/jira/browse/FLINK-28373, when reading data and 
> recycling buffers simultaneously, the deadlock between the lock of 
> SortMergeResultPartition and the lock of SingleInputGate may occur.
> In short, the deadlock may occur as follows.
> 1. SingleInputGate.getNextBufferOrEvent (SingleInputGate lock)
> CompositeBuffer.getFullBufferData -> CompositeBuffer.recycleBuffer -> waiting 
> for 
> SortMergeResultPartition lock;
> 2. ResultPartitionManager.releasePartition (SortMergeResultPartition lock) -> 
> SortMergeSubpartitionReader.notifyDataAvailable -> 
> SingleInputGate.notifyChannelNonEmpty -> waiting for SingleInputGate lock.
> The possibility of this deadlock is very small, but we should fix the bug as 
> soon as possible.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28623) Optimize the use of off heap memory by blocking and hybrid shuffle reader

2022-08-09 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao reassigned FLINK-28623:
---

Assignee: Weijie Guo

> Optimize the use of off heap memory by blocking and hybrid shuffle reader
> -
>
> Key: FLINK-28623
> URL: https://issues.apache.org/jira/browse/FLINK-28623
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Minor
>  Labels: pull-request-available
>
> Currently, each FileReader(PartitionFileReader or 
> HsSubpartitionFileReaderImpl) will internally allocate a headerbuffer with 
> the size of 8B. Beside, PartitionFileReader also has a 12B indexEntryBuf. 
> Because FileReader is of subpartition granularity, If the parallelism becomes 
> very big, and there are many slots on each TM, the memory occupation will 
> even reach the MB level. In fact, all FileReader of the same ResultPartition 
> read data in a single thread, so we only need to allocate a headerbuffer to a 
> ResultPartition to optimize this phenomenon.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-28623) Optimize the use of off heap memory by blocking and hybrid shuffle reader

2022-08-09 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao resolved FLINK-28623.
-
Fix Version/s: 1.16.0
   Resolution: Fixed

Merged into master via 87d4f70e49255b551d0106117978b1aa0747358c

> Optimize the use of off heap memory by blocking and hybrid shuffle reader
> -
>
> Key: FLINK-28623
> URL: https://issues.apache.org/jira/browse/FLINK-28623
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> Currently, each FileReader(PartitionFileReader or 
> HsSubpartitionFileReaderImpl) will internally allocate a headerbuffer with 
> the size of 8B. Beside, PartitionFileReader also has a 12B indexEntryBuf. 
> Because FileReader is of subpartition granularity, If the parallelism becomes 
> very big, and there are many slots on each TM, the memory occupation will 
> even reach the MB level. In fact, all FileReader of the same ResultPartition 
> read data in a single thread, so we only need to allocate a headerbuffer to a 
> ResultPartition to optimize this phenomenon.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28374) Some further improvements of blocking shuffle

2022-08-09 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao closed FLINK-28374.
---
Resolution: Fixed

> Some further improvements of blocking shuffle
> -
>
> Key: FLINK-28374
> URL: https://issues.apache.org/jira/browse/FLINK-28374
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Major
> Fix For: 1.16.0
>
>
> This is an umbrella issue for sort-shuffle Improvements.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-28373) Read a full buffer of data per file IO read request for sort-shuffle

2022-08-09 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao resolved FLINK-28373.
-
Resolution: Fixed

Merged into master via d6a47d897a9a4753c800b39adb17c06e154422cc

> Read a full buffer of data per file IO read request for sort-shuffle
> 
>
> Key: FLINK-28373
> URL: https://issues.apache.org/jira/browse/FLINK-28373
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> Currently, for sort blocking shuffle, the corresponding data readers read 
> shuffle data in buffer granularity. Before compression, each buffer is 32K by 
> default, after compression the size will become smaller (may less than 10K). 
> For file IO, this is pretty smaller. To achieve better performance and reduce 
> IOPS, we can read more data per IO read request and parse buffer header and 
> data in memory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28380) Produce one intermediate dataset for multiple consumers consuming the same data

2022-08-08 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao closed FLINK-28380.
---
Resolution: Fixed

Merged into master via 60bc87c0b83149c4f19d7e54af2d967087a277fb

> Produce one intermediate dataset for multiple consumers consuming the same 
> data
> ---
>
> Key: FLINK-28380
> URL: https://issues.apache.org/jira/browse/FLINK-28380
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission, Runtime / Coordination, Runtime 
> / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> Currently, if one output of an upstream job vertex is consumed by multiple 
> downstream job vertices, the upstream vertex will produce multiple dataset. 
> For blocking shuffle, it means serialize and persist the same data multiple 
> times. This ticket aims to optimize this behavior and make the upstream job 
> vertex produce one dataset which will be read by multiple downstream vertex.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-28663) Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

2022-08-08 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao resolved FLINK-28663.
-
Fix Version/s: 1.16.0
   Resolution: Fixed

Merged into master via 72405361610f1576e0f57ea4e957fafbbbaf0910

> Allow multiple downstream consumer job vertices sharing the same intermediate 
> dataset at scheduler side
> ---
>
> Key: FLINK-28663
> URL: https://issues.apache.org/jira/browse/FLINK-28663
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> Currently, one intermediate dataset can only be consumed by one downstream 
> consumer vertex. If there are multiple consumer vertices consuming the same 
> output of the same upstream vertex, multiple intermediate datasets will be 
> produced. We can optimize this behavior to produce only one intermediate 
> dataset which can be shared by multiple consumer vertices. As the first step, 
> we should allow multiple downstream consumer job vertices sharing the same 
> intermediate dataset at scheduler side. (Note that this optimization only 
> works for blocking shuffle because pipelined shuffle result partition can not 
> be consumed multiple times)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-28823) Enlarge the max requested buffers for SortMergeResultPartitionReadScheduler

2022-08-08 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao resolved FLINK-28823.
-
Resolution: Fixed

Merged into master via ca45a28205b424c2c77a21366fb29a67457672ff

> Enlarge the max requested buffers for SortMergeResultPartitionReadScheduler
> ---
>
> Key: FLINK-28823
> URL: https://issues.apache.org/jira/browse/FLINK-28823
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> The num of the max requested buffers in SortMergeResultPartitionReadScheduler 
> is determined by the num of subpartitions, which may be insufficient in some 
> scenarios, such as speculative execution and partition reuse.
> Therefore, we should increase the max requested buffers to make full use of 
> available memory when TM memory is sufficient.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28828) Sorting all unfinished readers in batches at one time in SortMergeResultPartitionReadScheduler

2022-08-08 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao reassigned FLINK-28828:
---

Assignee: Yuxin Tan

> Sorting all unfinished readers in batches at one time in 
> SortMergeResultPartitionReadScheduler
> --
>
> Key: FLINK-28828
> URL: https://issues.apache.org/jira/browse/FLINK-28828
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
>
> Currently, when reading data in SortMergeResultPartitionReadScheduler, the 
> reader is added to the priority queue immediately. However, the data read 
> from this reader may not have been consumed, which will cause this reader to 
> be ranked later in the queue, which is unfavorable to sequential reading.
> To solve the issue, After reading the data, we should sort all unfinished 
> readers in batches at one time, that is, add all unfinished readers to the 
> priority queue, which is more conducive to sequential reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28789) TPC-DS tests failed due to release input gate for task failure

2022-08-07 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao closed FLINK-28789.
---
Resolution: Fixed

>  TPC-DS tests failed  due to release input gate for task failure
> 
>
> Key: FLINK-28789
> URL: https://issues.apache.org/jira/browse/FLINK-28789
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Leonard Xu
>Assignee: Yuxin Tan
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.16.0
>
>
> {code:java}
> switched from CANCELING to CANCELED.
> 2022-08-03 08:03:02,776 INFO  org.apache.flink.runtime.taskmanager.Task   
>  [] - Freeing task resources for MultipleInput[2212] -> 
> Calc[2191] -> HashAggregate[2192] (8/8)#1 
> (cf5f33b100f0efb21b9ff8d27a78cd8e_d806bb3f5ea308ac3f1df304a96163b4_7_1).
> 2022-08-03 08:03:02,776 ERROR org.apache.flink.runtime.taskmanager.Task   
>  [] - Failed to release input gate for task MultipleInput[2212] 
> -> Calc[2191] -> HashAggregate[2192] (8/8)#1.
> org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: 
> refCnt: 0, decrement: 1
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.recycleBuffer(ReadOnlySlicedNetworkBuffer.java:123)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.buffer.CompositeBuffer.recycleBuffer(CompositeBuffer.java:70)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_332]
>   at 
> org.apache.flink.runtime.io.network.partition.SortMergeSubpartitionReader.releaseInternal(SortMergeSubpartitionReader.java:181)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.partition.SortMergeSubpartitionReader.releaseAllResources(SortMergeSubpartitionReader.java:163)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.releaseAllResources(LocalInputChannel.java:341)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.close(SingleInputGate.java:667)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.close(InputGateWithMetrics.java:140)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.taskmanager.Task.closeAllInputGates(Task.java:1010) 
> [flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.taskmanager.Task.releaseResources(Task.java:975) 
> [flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:820) 
> [flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
> [flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at java.lang.Thread.run(Thread.java:750) [?:1.8.0_332]
> 2022-08-03 08:03:02,778 WARN  org.apache.flink.metrics.MetricGroup 
> {code}
> The failed CI link: 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39152=results



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28789) TPC-DS tests failed due to release input gate for task failure

2022-08-07 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17576513#comment-17576513
 ] 

Yingjie Cao commented on FLINK-28789:
-

Closing this issue, feel free to reopen it if the problem still exists.

>  TPC-DS tests failed  due to release input gate for task failure
> 
>
> Key: FLINK-28789
> URL: https://issues.apache.org/jira/browse/FLINK-28789
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Leonard Xu
>Assignee: Yuxin Tan
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.16.0
>
>
> {code:java}
> switched from CANCELING to CANCELED.
> 2022-08-03 08:03:02,776 INFO  org.apache.flink.runtime.taskmanager.Task   
>  [] - Freeing task resources for MultipleInput[2212] -> 
> Calc[2191] -> HashAggregate[2192] (8/8)#1 
> (cf5f33b100f0efb21b9ff8d27a78cd8e_d806bb3f5ea308ac3f1df304a96163b4_7_1).
> 2022-08-03 08:03:02,776 ERROR org.apache.flink.runtime.taskmanager.Task   
>  [] - Failed to release input gate for task MultipleInput[2212] 
> -> Calc[2191] -> HashAggregate[2192] (8/8)#1.
> org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: 
> refCnt: 0, decrement: 1
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.recycleBuffer(ReadOnlySlicedNetworkBuffer.java:123)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.buffer.CompositeBuffer.recycleBuffer(CompositeBuffer.java:70)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_332]
>   at 
> org.apache.flink.runtime.io.network.partition.SortMergeSubpartitionReader.releaseInternal(SortMergeSubpartitionReader.java:181)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.partition.SortMergeSubpartitionReader.releaseAllResources(SortMergeSubpartitionReader.java:163)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.releaseAllResources(LocalInputChannel.java:341)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.close(SingleInputGate.java:667)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.close(InputGateWithMetrics.java:140)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.taskmanager.Task.closeAllInputGates(Task.java:1010) 
> [flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.taskmanager.Task.releaseResources(Task.java:975) 
> [flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:820) 
> [flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
> [flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at java.lang.Thread.run(Thread.java:750) [?:1.8.0_332]
> 2022-08-03 08:03:02,778 WARN  org.apache.flink.metrics.MetricGroup 
> {code}
> The failed CI link: 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39152=results



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-28828) Sorting all unfinished readers in batches at one time in SortMergeResultPartitionReadScheduler

2022-08-07 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao resolved FLINK-28828.
-
Resolution: Fixed

Merged into master via 07a309ad93444097049aa65944e2ca4fbff37e42

> Sorting all unfinished readers in batches at one time in 
> SortMergeResultPartitionReadScheduler
> --
>
> Key: FLINK-28828
> URL: https://issues.apache.org/jira/browse/FLINK-28828
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
>
> Currently, when reading data in SortMergeResultPartitionReadScheduler, the 
> reader is added to the priority queue immediately. However, the data read 
> from this reader may not have been consumed, which will cause this reader to 
> be ranked later in the queue, which is unfavorable to sequential reading.
> To solve the issue, After reading the data, we should sort all unfinished 
> readers in batches at one time, that is, add all unfinished readers to the 
> priority queue, which is more conducive to sequential reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-28826) Avoid notifying too frequently when recycling buffers for BatchShuffleReadBufferPool

2022-08-07 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao resolved FLINK-28826.
-
Resolution: Fixed

> Avoid notifying too frequently when recycling buffers for 
> BatchShuffleReadBufferPool
> 
>
> Key: FLINK-28826
> URL: https://issues.apache.org/jira/browse/FLINK-28826
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> When recycling buffers in BatchShuffleReadBufferPool, the number of buffers 
> may be larger than numBuffersPerRequest, which may cause too frequent 
> notifications that the buffers are already available.
> So we should modify the condition of notifications that the buffer is 
> available to solve this problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28826) Avoid notifying too frequently when recycling buffers for BatchShuffleReadBufferPool

2022-08-07 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17576375#comment-17576375
 ] 

Yingjie Cao commented on FLINK-28826:
-

Merged into master via 05de0a295efe06ff342a211071fc3810fdd53a2e.

> Avoid notifying too frequently when recycling buffers for 
> BatchShuffleReadBufferPool
> 
>
> Key: FLINK-28826
> URL: https://issues.apache.org/jira/browse/FLINK-28826
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> When recycling buffers in BatchShuffleReadBufferPool, the number of buffers 
> may be larger than numBuffersPerRequest, which may cause too frequent 
> notifications that the buffers are already available.
> So we should modify the condition of notifications that the buffer is 
> available to solve this problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28826) Avoid notifying too frequently when recycling buffers for BatchShuffleReadBufferPool

2022-08-07 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao reassigned FLINK-28826:
---

Assignee: Yuxin Tan

> Avoid notifying too frequently when recycling buffers for 
> BatchShuffleReadBufferPool
> 
>
> Key: FLINK-28826
> URL: https://issues.apache.org/jira/browse/FLINK-28826
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
>
> When recycling buffers in BatchShuffleReadBufferPool, the number of buffers 
> may be larger than numBuffersPerRequest, which may cause too frequent 
> notifications that the buffers are already available.
> So we should modify the condition of notifications that the buffer is 
> available to solve this problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28826) Avoid notifying too frequently when recycling buffers for BatchShuffleReadBufferPool

2022-08-07 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-28826:

Fix Version/s: 1.16.0

> Avoid notifying too frequently when recycling buffers for 
> BatchShuffleReadBufferPool
> 
>
> Key: FLINK-28826
> URL: https://issues.apache.org/jira/browse/FLINK-28826
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> When recycling buffers in BatchShuffleReadBufferPool, the number of buffers 
> may be larger than numBuffersPerRequest, which may cause too frequent 
> notifications that the buffers are already available.
> So we should modify the condition of notifications that the buffer is 
> available to solve this problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28823) Enlarge the max requested buffers for SortMergeResultPartitionReadScheduler

2022-08-07 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-28823:

Fix Version/s: 1.16.0

> Enlarge the max requested buffers for SortMergeResultPartitionReadScheduler
> ---
>
> Key: FLINK-28823
> URL: https://issues.apache.org/jira/browse/FLINK-28823
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> The num of the max requested buffers in SortMergeResultPartitionReadScheduler 
> is determined by the num of subpartitions, which may be insufficient in some 
> scenarios, such as speculative execution and partition reuse.
> Therefore, we should increase the max requested buffers to make full use of 
> available memory when TM memory is sufficient.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28823) Enlarge the max requested buffers for SortMergeResultPartitionReadScheduler

2022-08-07 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao reassigned FLINK-28823:
---

Assignee: Yuxin Tan

> Enlarge the max requested buffers for SortMergeResultPartitionReadScheduler
> ---
>
> Key: FLINK-28823
> URL: https://issues.apache.org/jira/browse/FLINK-28823
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
>
> The num of the max requested buffers in SortMergeResultPartitionReadScheduler 
> is determined by the num of subpartitions, which may be insufficient in some 
> scenarios, such as speculative execution and partition reuse.
> Therefore, we should increase the max requested buffers to make full use of 
> available memory when TM memory is sufficient.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28789) TPC-DS tests failed due to release input gate for task failure

2022-08-04 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17575360#comment-17575360
 ] 

Yingjie Cao commented on FLINK-28789:
-

Though still not know the root cause, by reverting FLINK-28373 and testing 
multiple times on my own azure account, the issue seems resolved. For CI 
stability, I am reverting FLINK-28373, let's see if that solves the problem.

>  TPC-DS tests failed  due to release input gate for task failure
> 
>
> Key: FLINK-28789
> URL: https://issues.apache.org/jira/browse/FLINK-28789
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Leonard Xu
>Assignee: Yuxin Tan
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.16.0
>
>
> {code:java}
> switched from CANCELING to CANCELED.
> 2022-08-03 08:03:02,776 INFO  org.apache.flink.runtime.taskmanager.Task   
>  [] - Freeing task resources for MultipleInput[2212] -> 
> Calc[2191] -> HashAggregate[2192] (8/8)#1 
> (cf5f33b100f0efb21b9ff8d27a78cd8e_d806bb3f5ea308ac3f1df304a96163b4_7_1).
> 2022-08-03 08:03:02,776 ERROR org.apache.flink.runtime.taskmanager.Task   
>  [] - Failed to release input gate for task MultipleInput[2212] 
> -> Calc[2191] -> HashAggregate[2192] (8/8)#1.
> org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: 
> refCnt: 0, decrement: 1
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.recycleBuffer(ReadOnlySlicedNetworkBuffer.java:123)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.buffer.CompositeBuffer.recycleBuffer(CompositeBuffer.java:70)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_332]
>   at 
> org.apache.flink.runtime.io.network.partition.SortMergeSubpartitionReader.releaseInternal(SortMergeSubpartitionReader.java:181)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.partition.SortMergeSubpartitionReader.releaseAllResources(SortMergeSubpartitionReader.java:163)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.releaseAllResources(LocalInputChannel.java:341)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.close(SingleInputGate.java:667)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.close(InputGateWithMetrics.java:140)
>  ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.taskmanager.Task.closeAllInputGates(Task.java:1010) 
> [flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.taskmanager.Task.releaseResources(Task.java:975) 
> [flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:820) 
> [flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
> [flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at java.lang.Thread.run(Thread.java:750) [?:1.8.0_332]
> 2022-08-03 08:03:02,778 WARN  org.apache.flink.metrics.MetricGroup 
> {code}
> The failed CI link: 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39152=results



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28382) Introduce new compression algorithms of higher compression ratio

2022-08-02 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17574075#comment-17574075
 ] 

Yingjie Cao commented on FLINK-28382:
-

Merged into master via 47d0b6d26c052a817a66f7b719eecf01387cb0d3

> Introduce new compression algorithms of higher compression ratio
> 
>
> Key: FLINK-28382
> URL: https://issues.apache.org/jira/browse/FLINK-28382
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> Currently, we use lz4 for shuffle data compression which is a good balance 
> between IO optimization and CPU consumption. But for some scenarios, the IO 
> becomes bottleneck and the storage space is limited (especially for k8s 
> environment). For these cases, we need compression algorithms of higher 
> compression ratio to further reduce IO.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-28382) Introduce new compression algorithms of higher compression ratio

2022-08-02 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao resolved FLINK-28382.
-
Resolution: Fixed

> Introduce new compression algorithms of higher compression ratio
> 
>
> Key: FLINK-28382
> URL: https://issues.apache.org/jira/browse/FLINK-28382
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> Currently, we use lz4 for shuffle data compression which is a good balance 
> between IO optimization and CPU consumption. But for some scenarios, the IO 
> becomes bottleneck and the storage space is limited (especially for k8s 
> environment). For these cases, we need compression algorithms of higher 
> compression ratio to further reduce IO.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-28519) Fix the bug that SortMergeResultPartitionReadScheduler may not read data sequentially

2022-07-29 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao resolved FLINK-28519.
-
Resolution: Fixed

Merged into master via f88489a6af42638679429df3fdb4818c278cacbf

> Fix the bug that SortMergeResultPartitionReadScheduler may not read data 
> sequentially
> -
>
> Key: FLINK-28519
> URL: https://issues.apache.org/jira/browse/FLINK-28519
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> Currently, the SortMergeResultPartitionReadScheduler always gets all active 
> subpartition readers and read at most one data region for them. It is common 
> that some subpartitions are requested before others and their region indexes 
> are ahead of others. If all region data of a subpartition can be read in one 
> round, some subpartition readers will always ahead of others which will cause 
> random IO. This patch fixes this case by polling one subpartition reader at a 
> time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28448) BoundedDataTestBase has potential problem when compression is enable

2022-07-27 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao closed FLINK-28448.
---
Resolution: Fixed

Merged into master via 1b012cb2c402a91be58e8b201ffd1b6d38b58ef9 and 
a4ad7814009f3e799f198d54f24a055894b1ebce

> BoundedDataTestBase has potential problem when compression is enable
> 
>
> Key: FLINK-28448
> URL: https://issues.apache.org/jira/browse/FLINK-28448
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> BoundedDataTestBase has potential problem when compression is enable,  I'm 
> sure there is a bug in this part of the code. But unfortunately, the test can 
> pass now, because the generated data used in the test cannot be compressed by 
> LZ4 (i.e. the compressed size is larger than the original size), which causes 
> the branch of enable compression actually the same as disable compression, So 
> the bug didn't surface.
> I think there are two steps of work that need to be done:
> 1. Fix incorrect code
> 2. Generate data that can be properly compressed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28448) BoundedDataTestBase has potential problem when compression is enable

2022-07-27 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao reassigned FLINK-28448:
---

Assignee: Weijie Guo

> BoundedDataTestBase has potential problem when compression is enable
> 
>
> Key: FLINK-28448
> URL: https://issues.apache.org/jira/browse/FLINK-28448
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> BoundedDataTestBase has potential problem when compression is enable,  I'm 
> sure there is a bug in this part of the code. But unfortunately, the test can 
> pass now, because the generated data used in the test cannot be compressed by 
> LZ4 (i.e. the compressed size is larger than the original size), which causes 
> the branch of enable compression actually the same as disable compression, So 
> the bug didn't surface.
> I think there are two steps of work that need to be done:
> 1. Fix incorrect code
> 2. Generate data that can be properly compressed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28378) Use larger data reading buffer size for sort-shuffle

2022-07-25 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-28378:

Parent: (was: FLINK-28374)
Issue Type: Improvement  (was: Sub-task)

> Use larger data reading buffer size for sort-shuffle
> 
>
> Key: FLINK-28378
> URL: https://issues.apache.org/jira/browse/FLINK-28378
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Major
> Fix For: 1.17.0
>
>
> Currently, for sort shuffle, we always use the network buffer size as the 
> data reading buffer size which is 32K by default. We can increase this buffer 
> size for better performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28378) Use larger data reading buffer size for sort-shuffle

2022-07-25 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-28378:

Fix Version/s: 1.17.0

> Use larger data reading buffer size for sort-shuffle
> 
>
> Key: FLINK-28378
> URL: https://issues.apache.org/jira/browse/FLINK-28378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Major
> Fix For: 1.17.0
>
>
> Currently, for sort shuffle, we always use the network buffer size as the 
> data reading buffer size which is 32K by default. We can increase this buffer 
> size for better performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28378) Use larger data reading buffer size for sort-shuffle

2022-07-25 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-28378:

Fix Version/s: (was: 1.16.0)

> Use larger data reading buffer size for sort-shuffle
> 
>
> Key: FLINK-28378
> URL: https://issues.apache.org/jira/browse/FLINK-28378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Major
>
> Currently, for sort shuffle, we always use the network buffer size as the 
> data reading buffer size which is 32K by default. We can increase this buffer 
> size for better performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28663) Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

2022-07-25 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao reassigned FLINK-28663:
---

Assignee: Yingjie Cao

> Allow multiple downstream consumer job vertices sharing the same intermediate 
> dataset at scheduler side
> ---
>
> Key: FLINK-28663
> URL: https://issues.apache.org/jira/browse/FLINK-28663
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>
> Currently, one intermediate dataset can only be consumed by one downstream 
> consumer vertex. If there are multiple consumer vertices consuming the same 
> output of the same upstream vertex, multiple intermediate datasets will be 
> produced. We can optimize this behavior to produce only one intermediate 
> dataset which can be shared by multiple consumer vertices. As the first step, 
> we should allow multiple downstream consumer job vertices sharing the same 
> intermediate dataset at scheduler side. (Note that this optimization only 
> works for blocking shuffle because pipelined shuffle result partition can not 
> be consumed multiple times)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28663) Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

2022-07-25 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-28663:

Parent: FLINK-28374
Issue Type: Sub-task  (was: Improvement)

> Allow multiple downstream consumer job vertices sharing the same intermediate 
> dataset at scheduler side
> ---
>
> Key: FLINK-28663
> URL: https://issues.apache.org/jira/browse/FLINK-28663
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Yingjie Cao
>Priority: Major
>
> Currently, one intermediate dataset can only be consumed by one downstream 
> consumer vertex. If there are multiple consumer vertices consuming the same 
> output of the same upstream vertex, multiple intermediate datasets will be 
> produced. We can optimize this behavior to produce only one intermediate 
> dataset which can be shared by multiple consumer vertices. As the first step, 
> we should allow multiple downstream consumer job vertices sharing the same 
> intermediate dataset at scheduler side. (Note that this optimization only 
> works for blocking shuffle because pipelined shuffle result partition can not 
> be consumed multiple times)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28373) Read a full buffer of data per file IO read request for sort-shuffle

2022-07-25 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao reassigned FLINK-28373:
---

Assignee: Yuxin Tan

> Read a full buffer of data per file IO read request for sort-shuffle
> 
>
> Key: FLINK-28373
> URL: https://issues.apache.org/jira/browse/FLINK-28373
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> Currently, for sort blocking shuffle, the corresponding data readers read 
> shuffle data in buffer granularity. Before compression, each buffer is 32K by 
> default, after compression the size will become smaller (may less than 10K). 
> For file IO, this is pretty smaller. To achieve better performance and reduce 
> IOPS, we can read more data per IO read request and parse buffer header and 
> data in memory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28380) Produce one intermediate dataset for multiple consumers consuming the same data

2022-07-25 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao reassigned FLINK-28380:
---

Assignee: Yingjie Cao

> Produce one intermediate dataset for multiple consumers consuming the same 
> data
> ---
>
> Key: FLINK-28380
> URL: https://issues.apache.org/jira/browse/FLINK-28380
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission, Runtime / Coordination, Runtime 
> / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
> Fix For: 1.16.0
>
>
> Currently, if one output of an upstream job vertex is consumed by multiple 
> downstream job vertices, the upstream vertex will produce multiple dataset. 
> For blocking shuffle, it means serialize and persist the same data multiple 
> times. This ticket aims to optimize this behavior and make the upstream job 
> vertex produce one dataset which will be read by multiple downstream vertex.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   5   6   7   8   >