[jira] [Commented] (FLINK-22945) StackOverflowException can happen when a large scale job is CANCELING/FAILING

2021-06-28 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17370454#comment-17370454
 ] 

Gen Luo commented on FLINK-22945:
-

Hi [~zhuzh],

I'd like to work on this, would you please assign the ticket to me?

By the way, I've tried to make resource assignment or new slots notification 
asynchronous to break the recursive loop in call stack of slot assignment, but 
found that the status maintenance of slots would be more complex and may lead 
to unexpected issues. I suggest not to modify the call stack but only avoid 
entering the recursive loop. What do you think about it?

> StackOverflowException can happen when a large scale job is CANCELING/FAILING
> -
>
> Key: FLINK-22945
> URL: https://issues.apache.org/jira/browse/FLINK-22945
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.13.1, 1.12.4
>Reporter: Zhu Zhu
>Priority: Major
>  Labels: auto-deprioritized-critical
>
> The pending requests in ExecutionSlotAllocator are not cleared when a job 
> transitions to CANCELING or FAILING, while all vertices will be canceled and 
> assigned slot will be returned. The returned slot is possible to be used to 
> fulfill the pending request of a CANCELED vertex and the assignment will fail 
> immediately and the slot will be returned and used to fulfilled another 
> vertex in a recursive way. StackOverflow can happen in this way when there 
> are many vertices, and fatal error can happen and lead to JM will crash. A 
> sample call stack is attached below.
> To fix this problem, we should clear the pending requests in 
> ExecutionSlotAllocator when a job is CANCELING or FAILING. Besides that, I 
> think it's better to also improve the call stack of slot assignment to avoid 
> similar StackOverflowException to occur.
> ...
>   at 
> org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:533)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:512)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.

[jira] [Created] (FLINK-23216) RM keeps allocating and freeing slots after a TM lost until its heartbeat timeout

2021-07-01 Thread Gen Luo (Jira)
Gen Luo created FLINK-23216:
---

 Summary: RM keeps allocating and freeing slots after a TM lost 
until its heartbeat timeout
 Key: FLINK-23216
 URL: https://issues.apache.org/jira/browse/FLINK-23216
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.13.1
Reporter: Gen Luo


In Flink 1.13, it's observed that the ResourceManager keeps allocating and 
freeing slots with a new TM when it's notified by yarn that a TM is lost. The 
behavior will continue until JM marks the TM as FAILED when its heartbeat 
timeout is reached. It can be easily reproduced by enlarging the 
akka.ask.timeout and heartbeat.timeout, for example to 10 min.

 

After tracking, we find the procedure should be like this:

When a TM is killed, yarn will first receive the event and notify the RM.

In Flink 1.13, RM uses declarative resource management to manage the slots. It 
will find a lack of resources when receiving the notification, and then request 
a new TM from yarn.

RM will then require the new TM to connect and offer slots to JM.

But from JM's point of view, all slots are fulfilled, since the lost TM is not 
considered disconnected yet, until the heartbeat timeout is reached, so JM will 
reject all slot offers.

The new TM will find no slot serving for the JM, then disconnect from the JM.

RM will then find a lack of resources again and go back to step3, requiring the 
new TM to connect and offer slots to JM, but It won't request another new TM 
from yarn.

 

The original log is lost but is like this:

o.a.f.r.r.s.DefaultSlotStatusSyncer - Freeing slot xxx.

...(repeat serval lines for different slots)...

o.a.f.r.r.s.DefaultSlotStatusSyncer - Starting allocation of slot xxx from 
container_xxx for job xxx.

...(repeat serval lines for different slots)...

 

This could be fixed in several ways, such as notifying JM as well the RM 
receives a TM lost notification, TMs do not offer slots until required, etc. 
But all these ways have side effects so may need further discussion. 

Besides, this should no longer be an issue after FLINK-23209 is done.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23216) RM keeps allocating and freeing slots after a TM lost until its heartbeat timeout

2021-07-05 Thread Gen Luo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gen Luo updated FLINK-23216:

Description: 
In Flink 1.13, it's observed that the ResourceManager keeps allocating and 
freeing slots with a new TM when it's notified by yarn that a TM is lost. The 
behavior will continue until JM marks the TM as FAILED when its heartbeat 
timeout is reached. It can be easily reproduced by enlarging the 
akka.ask.timeout and heartbeat.timeout, for example to 10 min.

 

After tracking, we find the procedure should be like this:

When a TM is killed, yarn will first receive the event and notify the RM.

In Flink 1.13, RM uses declarative resource management to manage the slots. It 
will find a lack of resources when receiving the notification, and then request 
a new TM from yarn.

RM will then require the new TM to connect and offer slots to JM.

But from JM's point of view, all slots are fulfilled, since the lost TM is not 
considered disconnected yet, until the heartbeat timeout is reached, so JM will 
reject all slot offers.

The new TM will find no slot serving for the JM, then disconnect from the JM.

RM will then find a lack of resources again and go back to step3, requiring the 
new TM to connect and offer slots to JM, but It won't request another new TM 
from yarn.

 

The original log is lost but is like this:

o.a.f.r.r.s.DefaultSlotStatusSyncer - Freeing slot xxx.

...(repeat serval lines for different slots)...

o.a.f.r.r.s.DefaultSlotStatusSyncer - Starting allocation of slot xxx from 
container_xxx for job xxx.

...(repeat serval lines for different slots)...

 

This could be fixed in several ways, such as notifying JM as well the RM 
receives a TM lost notification, TMs do not offer slots until required, etc. 
But all these ways have side effects so may need further discussion. 

 

  was:
In Flink 1.13, it's observed that the ResourceManager keeps allocating and 
freeing slots with a new TM when it's notified by yarn that a TM is lost. The 
behavior will continue until JM marks the TM as FAILED when its heartbeat 
timeout is reached. It can be easily reproduced by enlarging the 
akka.ask.timeout and heartbeat.timeout, for example to 10 min.

 

After tracking, we find the procedure should be like this:

When a TM is killed, yarn will first receive the event and notify the RM.

In Flink 1.13, RM uses declarative resource management to manage the slots. It 
will find a lack of resources when receiving the notification, and then request 
a new TM from yarn.

RM will then require the new TM to connect and offer slots to JM.

But from JM's point of view, all slots are fulfilled, since the lost TM is not 
considered disconnected yet, until the heartbeat timeout is reached, so JM will 
reject all slot offers.

The new TM will find no slot serving for the JM, then disconnect from the JM.

RM will then find a lack of resources again and go back to step3, requiring the 
new TM to connect and offer slots to JM, but It won't request another new TM 
from yarn.

 

The original log is lost but is like this:

o.a.f.r.r.s.DefaultSlotStatusSyncer - Freeing slot xxx.

...(repeat serval lines for different slots)...

o.a.f.r.r.s.DefaultSlotStatusSyncer - Starting allocation of slot xxx from 
container_xxx for job xxx.

...(repeat serval lines for different slots)...

 

This could be fixed in several ways, such as notifying JM as well the RM 
receives a TM lost notification, TMs do not offer slots until required, etc. 
But all these ways have side effects so may need further discussion. 

Besides, this should no longer be an issue after FLINK-23209 is done.

 


> RM keeps allocating and freeing slots after a TM lost until its heartbeat 
> timeout
> -
>
> Key: FLINK-23216
> URL: https://issues.apache.org/jira/browse/FLINK-23216
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.13.1, 1.12.4
>Reporter: Gen Luo
>Priority: Major
>
> In Flink 1.13, it's observed that the ResourceManager keeps allocating and 
> freeing slots with a new TM when it's notified by yarn that a TM is lost. The 
> behavior will continue until JM marks the TM as FAILED when its heartbeat 
> timeout is reached. It can be easily reproduced by enlarging the 
> akka.ask.timeout and heartbeat.timeout, for example to 10 min.
>  
> After tracking, we find the procedure should be like this:
> When a TM is killed, yarn will first receive the event and notify the RM.
> In Flink 1.13, RM uses declarative resource management to manage the slots. 
> It will find a lack of resources when receiving the notification, and then 
> request a new TM from yarn.
> RM will then require the new TM to connect and offer slots to JM.
> But from JM's point of view, all slots are fulfilled, since the lo

[jira] [Commented] (FLINK-23216) RM keeps allocating and freeing slots after a TM lost until its heartbeat timeout

2021-07-05 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17374650#comment-17374650
 ] 

Gen Luo commented on FLINK-23216:
-

I agree the improvement should be done regardless of FLINK-23209. They are 
independent things.  I'll modify the description.

> RM keeps allocating and freeing slots after a TM lost until its heartbeat 
> timeout
> -
>
> Key: FLINK-23216
> URL: https://issues.apache.org/jira/browse/FLINK-23216
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.13.1, 1.12.4
>Reporter: Gen Luo
>Priority: Major
>
> In Flink 1.13, it's observed that the ResourceManager keeps allocating and 
> freeing slots with a new TM when it's notified by yarn that a TM is lost. The 
> behavior will continue until JM marks the TM as FAILED when its heartbeat 
> timeout is reached. It can be easily reproduced by enlarging the 
> akka.ask.timeout and heartbeat.timeout, for example to 10 min.
>  
> After tracking, we find the procedure should be like this:
> When a TM is killed, yarn will first receive the event and notify the RM.
> In Flink 1.13, RM uses declarative resource management to manage the slots. 
> It will find a lack of resources when receiving the notification, and then 
> request a new TM from yarn.
> RM will then require the new TM to connect and offer slots to JM.
> But from JM's point of view, all slots are fulfilled, since the lost TM is 
> not considered disconnected yet, until the heartbeat timeout is reached, so 
> JM will reject all slot offers.
> The new TM will find no slot serving for the JM, then disconnect from the JM.
> RM will then find a lack of resources again and go back to step3, requiring 
> the new TM to connect and offer slots to JM, but It won't request another new 
> TM from yarn.
>  
> The original log is lost but is like this:
> o.a.f.r.r.s.DefaultSlotStatusSyncer - Freeing slot xxx.
> ...(repeat serval lines for different slots)...
> o.a.f.r.r.s.DefaultSlotStatusSyncer - Starting allocation of slot xxx from 
> container_xxx for job xxx.
> ...(repeat serval lines for different slots)...
>  
> This could be fixed in several ways, such as notifying JM as well the RM 
> receives a TM lost notification, TMs do not offer slots until required, etc. 
> But all these ways have side effects so may need further discussion. 
> Besides, this should no longer be an issue after FLINK-23209 is done.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-27910) FileSink not registered the timer to enforce rolling policy if started from scratch

2022-06-06 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17550823#comment-17550823
 ] 

Gen Luo commented on FLINK-27910:
-

This is a mistake while migrating the FileSink to the new sink API. In the new 
sink API createWriter and restoreWriter have been separated into two methods, 
while originally creating a writer is by calling restoreWriter with an empty 
state collection. We mistook the the meaning of createWriter and only created a 
writer in it. 
A PR has been provided to fix this bug, which changes the createWriter as the 
original behavior.

> FileSink not registered the timer to enforce rolling policy if started from 
> scratch
> ---
>
> Key: FLINK-27910
> URL: https://issues.apache.org/jira/browse/FLINK-27910
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yun Gao
>Priority: Critical
>  Labels: pull-request-available
>
> The current FileWriter only register the timer in initializeState, which is 
> now only called on restoring. Thus if the job is started from scratch, the 
> timer would fail to be registered and cause the rolling policy not work. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-25937) SQL Client end-to-end test e2e fails on AZP

2022-02-09 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17489997#comment-17489997
 ] 

Gen Luo commented on FLINK-25937:
-

The reason has been identified. 

Parallelism of a transformation with default parallelism(-1) is set when 
transforming, using the default parallelism set in the environment. However, in 
SinkExpander#expand, the environment parallelism is set to -1 at the entrance, 
to verify if the parallelism of a expanded transformation is set. The 
environment parallelism will be restored when exiting the method, but at 
present the transform is called within this scope. If the parallelism of a sink 
is not set, the parallelism of the sink transformation and all transformations 
expanded from it will not be handled, so the JobGraph generated will have 
vertices with -1 parallelism, causing the assertion failure in 
AdaptiveScheduler.

We can fix the bug by putting the restoring of the environment parallelism 
ahead of transforming the sink transformations. The pull request has been 
created, and has been verified with UpsertKafkaTableITCase.

> SQL Client end-to-end test e2e fails on AZP
> ---
>
> Key: FLINK-25937
> URL: https://issues.apache.org/jira/browse/FLINK-25937
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core, API / DataStream, Runtime / Coordination, 
> Table SQL / API
>Affects Versions: 1.15.0
>Reporter: Till Rohrmann
>Assignee: Gen Luo
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> The {{SQL Client end-to-end test}} e2e tests fails on AZP when using the 
> {{AdaptiveScheduler}} because the scheduler expects that the parallelism is 
> set for all vertices:
> {code}
> Feb 03 03:45:13 org.apache.flink.runtime.client.JobInitializationException: 
> Could not start the JobMaster.
> Feb 03 03:45:13   at 
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
> Feb 03 03:45:13   at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> Feb 03 03:45:13   at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> Feb 03 03:45:13   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> Feb 03 03:45:13   at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
> Feb 03 03:45:13   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> Feb 03 03:45:13   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> Feb 03 03:45:13   at java.lang.Thread.run(Thread.java:748)
> Feb 03 03:45:13 Caused by: java.util.concurrent.CompletionException: 
> java.lang.IllegalStateException: The adaptive scheduler expects the 
> parallelism being set for each JobVertex (violated JobVertex: 
> f74b775b58627a33e46b8c155b320255).
> Feb 03 03:45:13   at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> Feb 03 03:45:13   at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> Feb 03 03:45:13   at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
> Feb 03 03:45:13   ... 3 more
> Feb 03 03:45:13 Caused by: java.lang.IllegalStateException: The adaptive 
> scheduler expects the parallelism being set for each JobVertex (violated 
> JobVertex: f74b775b58627a33e46b8c155b320255).
> Feb 03 03:45:13   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
> Feb 03 03:45:13   at 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.assertPreconditions(AdaptiveScheduler.java:296)
> Feb 03 03:45:13   at 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.(AdaptiveScheduler.java:230)
> Feb 03 03:45:13   at 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerFactory.createInstance(AdaptiveSchedulerFactory.java:122)
> Feb 03 03:45:13   at 
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:115)
> Feb 03 03:45:13   at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)
> Feb 03 03:45:13   at 
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:322)
> Feb 03 03:45:13   at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:106)
> Feb 03 03:45:13   at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:94)
> F

[jira] [Created] (FLINK-24965) Improper usage of Map.Entry after Entry Iterator.remove in TaskLocaStateStoreImpl#pruneCheckpoints

2021-11-19 Thread Gen Luo (Jira)
Gen Luo created FLINK-24965:
---

 Summary: Improper usage of Map.Entry after Entry Iterator.remove 
in TaskLocaStateStoreImpl#pruneCheckpoints
 Key: FLINK-24965
 URL: https://issues.apache.org/jira/browse/FLINK-24965
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Gen Luo


In TaskLocaStateStoreImpl#pruneCheckpoints, a list is created to store 
snapshots which should be removed, and entries to remove are directly add into 
the list. The code is like this.
{code:java}
Iterator> entryIterator =
storedTaskStateByCheckpointID.entrySet().iterator();

while (entryIterator.hasNext()) {
Map.Entry snapshotEntry = entryIterator.next();
long entryCheckpointId = snapshotEntry.getKey();
if (pruningChecker.test(entryCheckpointId)) {
toRemove.add(snapshotEntry);
entryIterator.remove();
} else if (breakOnceCheckerFalse) {
break;
}
} {code}
 

 

However, according to the javadoc of Map.Entry,
{code:java}
the behavior of a map entry is undefined if the backing map has been modified 
after the entry was returned by the iterator, except through the setValue 
operation on the map entry. {code}
 entries should not be reserved for further usage after iterator.remove is 
called. In this case, where the map is a TreeMap, if the first entry is skipped 
and removal happens from the second element in `storedTaskStateByCheckpointID`, 
 entries return by entryIterator.next will be the same object and the list will 
be filled with it.

 

A possible fix is to use `new AbstractMap.SimpleEntry<>(snapshotEntry.getKey(), 
snapshotEntry.getValue())` instead of snapshotEntry itself to add into the list.

 

The issue is a minor one since all usage of this method seems safe so far.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-30719) flink-runtime-web failed due to a corrupted

2023-02-02 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683265#comment-17683265
 ] 

Gen Luo commented on FLINK-30719:
-

Seems that the corrupting file has been deleted and the issue just gone. I 
suppose maybe we can close the issue now.

> flink-runtime-web failed due to a corrupted 
> 
>
> Key: FLINK-30719
> URL: https://issues.apache.org/jira/browse/FLINK-30719
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend, Test Infrastructure, Tests
>Affects Versions: 1.16.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44954&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb&l=12550
> The build failed due to a corrupted nodejs dependency:
> {code}
> [ERROR] The archive file 
> /__w/1/.m2/repository/com/github/eirslett/node/16.13.2/node-16.13.2-linux-x64.tar.gz
>  is corrupted and will be deleted. Please try the build again.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30943) Hide sensitive command-line configurations

2023-02-09 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686320#comment-17686320
 ] 

Gen Luo commented on FLINK-30943:
-

Hiding sensitive configurations is already supported if the key and its value 
are connected. The jira tries to hidden the value who's separated from its key.
However I also agree that we don't need to support such cases. Users can easily 
change the argument list to the supported format. And the neighboring argument 
may doesn't have any relation, in which case this feature may cause users 
confused.

> Hide sensitive command-line configurations
> --
>
> Key: FLINK-30943
> URL: https://issues.apache.org/jira/browse/FLINK-30943
> Project: Flink
>  Issue Type: New Feature
>  Components: Command Line Client
>Reporter: Bo Cui
>Priority: Major
>  Labels: pull-request-available
>
> When a job is submitted from the command line, log prints jvm options and 
> application dynamic parameters. 
> now only dynamic parameters (`-Dxx=yy`) are masked, but the key-value 
> separation parameter (`--key value`) is not masked.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30387) support job singleton within TM

2023-02-09 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686335#comment-17686335
 ] 

Gen Luo commented on FLINK-30387:
-

As you said we can use the flink user classloader, there can be a user defined 
singleton class in the user jar, which is for sure loaded by the user 
classloader, and put the shared objects in this class. I suppose that is enough 
for this purpose if i'm not missing anything.

> support job singleton within TM
> ---
>
> Key: FLINK-30387
> URL: https://issues.apache.org/jira/browse/FLINK-30387
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Core
>Reporter: James Z.M. Gao
>Priority: Major
>
> For better managing some shared objects between slots of the same job, such 
> as thread pool, db connections, cache data, etc., we need the ability to 
> create these objects once for each job at the beginning, then to release the 
> bounded resource when the job finished some how with correct order. A simple 
> static filed cannot always archive this when the class is loaded by the TM 
> classloader.
> To implement this job level singleton, we can use the flink user classloader. 
> Since each flink job has its own context class loader, we could add some 
> methods to operate the user CL to create a user specify AutoCloseable object 
> only once for each job and close it at last in release hook of the user CL.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30193) could flink has dynamic rebalance according to the load of downstream operators

2022-11-24 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17638336#comment-17638336
 ] 

Gen Luo commented on FLINK-30193:
-

HI [~landlord],
We did some work like this, which makes rebalance partitioner able to 
distribute the records according the load of downstream operators, and we are 
considering that maybe we would contribute the code to the community when we 
can make the time. Maybe we can discuss about the feature in the mail list if 
you'd like. The jira should be created after the discuss in the mail list 
reaches a consensus.

> could flink has dynamic rebalance  according to the load of downstream 
> operators 
> -
>
> Key: FLINK-30193
> URL: https://issues.apache.org/jira/browse/FLINK-30193
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.15.2
>Reporter: Lu
>Priority: Major
>
> Different records cause different loads to the downstream and different task 
> slot has different compute power. So rebalancing according to the load of 
> downstream operators will greatly reduce data skew and improve resource 
> utilization.
> I thaught the point is how to gain the load of downstream.  I have seen the 
> StreamPartitioner , and it dosen't has any context or method to get the load 
> information of downstream operators. So ordinary developers cannot easily 
> implement such functions.
> finally,Could flink provide such a function?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30244) When task using udf/udtf with jni, on k8s session the old TM will shut down and create new TM or the task will fail

2022-12-15 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17647953#comment-17647953
 ] 

Gen Luo commented on FLINK-30244:
-

[~AlexXXX]
Hi, do you have any question about this? If so you may post the log of 
JobManager which I think can help addressing the reason. While if there's no 
more question, maybe we can ask [~gyfora] to close the issue.

> When task using udf/udtf with jni, on k8s session the old TM will shut down 
> and create new TM or the task will fail
> ---
>
> Key: FLINK-30244
> URL: https://issues.apache.org/jira/browse/FLINK-30244
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes, Kubernetes Operator, Runtime / 
> Task
>Affects Versions: 1.15.3
>Reporter: AlexHu
>Priority: Major
> Attachments: image-2022-11-30-14-47-50-923.png, 
> image-2022-11-30-15-00-06-710.png, image-2022-11-30-15-04-45-696.png, 
> image-2022-11-30-15-05-29-120.png
>
>
> We face a problem when we try to use flink on k8s to execute task with 
> udf/udtf. When we finished or canceled a job and submit a same job, the old 
> TM will be not reachable and restart. Why the TM have to be restart? In 
> session mode, the TM should be reused by JM. Moreover, if we off restart 
> strategy, this task will fail.
> !image-2022-11-30-14-47-50-923.png!
>  
> First submit, the job will running:
> !image-2022-11-30-15-00-06-710.png!
>  
> But, cancel it and submit the same:
> !image-2022-11-30-15-04-45-696.png!
> Internal server error, but in k8s the pod is running.
> !image-2022-11-30-15-05-29-120.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30448) "filter(Objects::nonNull)" will bring down task with failure cause: ClassCastException

2023-01-05 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17654893#comment-17654893
 ] 

Gen Luo commented on FLINK-30448:
-

Hi Yong, 
I looked into the issue and found that this may be a limitation of Java lambda 
mechanism. 
{code:java}
class Test implements Serializable {

static boolean nonNull(Object x) {
return false;
}

public static void main(String[] args) throws Exception {
Test a = new Test();
FilterFunction f1 = Test::nonNull;
FilterFunction f2 = Test::nonNull;
System.out.println(f1 == f2);

byte[] b1 = InstantiationUtil.serializeObject(f1);
byte[] b2 = InstantiationUtil.serializeObject(f2);
System.out.println(Arrays.equals(b1, b2));

FilterFunction df1 =
InstantiationUtil.deserializeObject(
b1, Thread.currentThread().getContextClassLoader());
FilterFunction df2 =
InstantiationUtil.deserializeObject(
b2, Thread.currentThread().getContextClassLoader());
System.out.println(df1 == df2);
}
}
{code}
Run the code above with the nonNull as a static method in Test class, the 
results are (false, true, true).
While if you make the nonNull a non-static method, the results will be (false, 
true, false).

After looked into the internal calls, I found that when the lambda is 
deserialized, there will be a SerializedLambda. In both case the 
SerializedLambda objects are different. While it is not the goal, the 
SerializedLambda should be converted to a FilterFunction. To achieve this, a 
method called "$deserializeLambda$" will be invoked with the SerializedLambda 
during the deserialization. After that, if the method is not static, the 
results will be different instances, while if the method is static, the results 
are exactly the same instance. So when flink call the filter instance with two 
different types of data(Child1 and Child2), the second call will cause 
ClassCastException.

On the other hand, if the FilterFunctions are initialized with code 
`x->x!=null`, there'll be two lambda classes, so that will be fine.

Unfortunately I can find little data about the $deserializeLambda$ method. 
Here's my supposition. Maybe it makes some optimization to return a singleton 
instance for a static method lambda. It works well for non-generic classes but 
fails here.

In one word, I suppose this is the limitation of Java lambda mechanism and 
Flink can do nothing to avoid it. You have to use `x->x!=null` pattern, or use 
a non-static method to avoid the exception.

> "filter(Objects::nonNull)" will bring down task with failure cause: 
> ClassCastException
> --
>
> Key: FLINK-30448
> URL: https://issues.apache.org/jira/browse/FLINK-30448
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
> Environment: test both on flink 1.15.1 and flink 1.16.0
> Intellij-Idea dev environment run
>Reporter: Yong
>Priority: Major
> Attachments: TestSideOutput.java
>
>
> Attached an *all-in-one* java program, which can run locally in DEV 
> environment(e.g. IntelliJ IDEA->run), consuming from elements stream objects, 
> the object schema is a parent containing two childs(Child1 and Child2) 
> fields, I use *side-output* to map and split out two different sub-streams, 
> each for one child. I put '{*}filter(Objects:nonNUll){*}' for each sub-stream 
> to ignore null objects. When  comming from stream the parent record 
> {*}containing any one of child is null{*}, the program will bring down the 
> task and produce below error:
> ..
> switched from RUNNING to FAILED with failure cause: 
> java.lang.{*}ClassCastException{*}: mytest.TestSideOutput$Child2 cannot be 
> cast to mytest.TestSideOutput$Child1. Failed to push OutputTag with id 
> 'child2' to operator. This can occur when multiple OutputTags with different 
> types but identical names are being used.
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:88)
> ..
>  
> However, if I replace '{*}filter(Objects:nonNull){*}' (at line #71 and #90) 
> with logically equivalent  '{*}filter(x->x!=null){*}‘ (at line #70 and #89), 
> everythink will be OK.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28587) FLIP-249: Flink Web UI Enhancement for Speculative Execution

2022-07-18 Thread Gen Luo (Jira)
Gen Luo created FLINK-28587:
---

 Summary: FLIP-249: Flink Web UI Enhancement for Speculative 
Execution
 Key: FLINK-28587
 URL: https://issues.apache.org/jira/browse/FLINK-28587
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / REST, Runtime / Web Frontend
Affects Versions: 1.16.0
Reporter: Gen Luo


As a follow-up step of FLIP-168 and FLIP-224, the Flink Web UI needs to be 
enhanced to display the related information if the speculative execution 
mechanism is enabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28588) Enhance REST API for Speculative Execution

2022-07-18 Thread Gen Luo (Jira)
Gen Luo created FLINK-28588:
---

 Summary: Enhance REST API for Speculative Execution
 Key: FLINK-28588
 URL: https://issues.apache.org/jira/browse/FLINK-28588
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / REST
Affects Versions: 1.16.0
Reporter: Gen Luo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28589) Enhance Web UI for Speculative Execution

2022-07-18 Thread Gen Luo (Jira)
Gen Luo created FLINK-28589:
---

 Summary: Enhance Web UI for Speculative Execution
 Key: FLINK-28589
 URL: https://issues.apache.org/jira/browse/FLINK-28589
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Web Frontend
Affects Versions: 1.16.0
Reporter: Gen Luo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28864) DynamicPartitionPruningRule#isNewSource should check if the source used by the DataStreamScanProvider is actually a new sourc

2022-08-08 Thread Gen Luo (Jira)
Gen Luo created FLINK-28864:
---

 Summary: DynamicPartitionPruningRule#isNewSource should check if 
the source used by the DataStreamScanProvider is actually a new sourc
 Key: FLINK-28864
 URL: https://issues.apache.org/jira/browse/FLINK-28864
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Gen Luo


DynamicPartitionPruningRule#isNewSource supposes DataStreamScanProvider that 
supports dynamic filtering will use new source as its source, but it's not 
reliable. For better compatibility, the method should acquire the source 
transformation from the translated DataStream and check if the source is 
actually a new source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29927) AkkaUtils#getAddress may cause memory leak

2022-11-08 Thread Gen Luo (Jira)
Gen Luo created FLINK-29927:
---

 Summary: AkkaUtils#getAddress may cause memory leak
 Key: FLINK-29927
 URL: https://issues.apache.org/jira/browse/FLINK-29927
 Project: Flink
  Issue Type: Bug
Reporter: Gen Luo
 Attachments: 截屏2022-11-08 下午5.19.38.png

We found a slow memory leak in JM. When MetricFetcherImpl tries to retrieve 
metrics, it always call MetricQueryServiceRetriever#retrieveService first. And 
the method will acquire the address of a task manager, which will use 
AkkaUtil#getAddress internally. While the getAddress method is implemented like 
this:

{code:java}
public static Address getAddress(ActorSystem system) {
return new RemoteAddressExtension().apply(system).getAddress();
}
{code}

and the RemoteAddressExtension#apply is like this:

{code:scala}
  def apply(system: ActorSystem): T = {
java.util.Objects.requireNonNull(system, "system must not be 
null!").registerExtension(this)
  }
{code}

This means every call of AkkaUtils#getAddress will register a new extension to 
the ActorSystem, and can never be released until the ActorSystem exits.

Most of the usage of the method are called only once while initializing, but as 
described above, MetricFetcherImpl will also use the method. It can happens 
periodically while users open the WebUI, or happens when the users call the 
RESTful API directly to get metrics. This means the memory may keep leaking. 

The leak may be introduced in FLINK-23662 when porting the scala version of 
AkkaUtils to the java one, while I'm not sure if the scala version has the same 
issue.

The leak seems very slow. We observed it on a job running for more than one 
month with only 1G memory for job manager. So I suppose it's not an emergency 
one but still needs to fix.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29927) AkkaUtils#getAddress may cause memory leak

2022-11-08 Thread Gen Luo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gen Luo updated FLINK-29927:

Attachment: (was: 截屏2022-11-08 下午5.19.38.png)

> AkkaUtils#getAddress may cause memory leak
> --
>
> Key: FLINK-29927
> URL: https://issues.apache.org/jira/browse/FLINK-29927
> Project: Flink
>  Issue Type: Bug
>Reporter: Gen Luo
>Priority: Major
>
> We found a slow memory leak in JM. When MetricFetcherImpl tries to retrieve 
> metrics, it always call MetricQueryServiceRetriever#retrieveService first. 
> And the method will acquire the address of a task manager, which will use 
> AkkaUtil#getAddress internally. While the getAddress method is implemented 
> like this:
> {code:java}
> public static Address getAddress(ActorSystem system) {
> return new RemoteAddressExtension().apply(system).getAddress();
> }
> {code}
> and the RemoteAddressExtension#apply is like this:
> {code:scala}
>   def apply(system: ActorSystem): T = {
> java.util.Objects.requireNonNull(system, "system must not be 
> null!").registerExtension(this)
>   }
> {code}
> This means every call of AkkaUtils#getAddress will register a new extension 
> to the ActorSystem, and can never be released until the ActorSystem exits.
> Most of the usage of the method are called only once while initializing, but 
> as described above, MetricFetcherImpl will also use the method. It can 
> happens periodically while users open the WebUI, or happens when the users 
> call the RESTful API directly to get metrics. This means the memory may keep 
> leaking. 
> The leak may be introduced in FLINK-23662 when porting the scala version of 
> AkkaUtils to the java one, while I'm not sure if the scala version has the 
> same issue.
> The leak seems very slow. We observed it on a job running for more than one 
> month with only 1G memory for job manager. So I suppose it's not an emergency 
> one but still needs to fix.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29927) AkkaUtils#getAddress may cause memory leak

2022-11-08 Thread Gen Luo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gen Luo updated FLINK-29927:

Attachment: RemoteAddressExtensionLeaking.png

> AkkaUtils#getAddress may cause memory leak
> --
>
> Key: FLINK-29927
> URL: https://issues.apache.org/jira/browse/FLINK-29927
> Project: Flink
>  Issue Type: Bug
>Reporter: Gen Luo
>Priority: Major
> Attachments: RemoteAddressExtensionLeaking.png
>
>
> We found a slow memory leak in JM. When MetricFetcherImpl tries to retrieve 
> metrics, it always call MetricQueryServiceRetriever#retrieveService first. 
> And the method will acquire the address of a task manager, which will use 
> AkkaUtil#getAddress internally. While the getAddress method is implemented 
> like this:
> {code:java}
> public static Address getAddress(ActorSystem system) {
> return new RemoteAddressExtension().apply(system).getAddress();
> }
> {code}
> and the RemoteAddressExtension#apply is like this:
> {code:scala}
>   def apply(system: ActorSystem): T = {
> java.util.Objects.requireNonNull(system, "system must not be 
> null!").registerExtension(this)
>   }
> {code}
> This means every call of AkkaUtils#getAddress will register a new extension 
> to the ActorSystem, and can never be released until the ActorSystem exits.
> Most of the usage of the method are called only once while initializing, but 
> as described above, MetricFetcherImpl will also use the method. It can 
> happens periodically while users open the WebUI, or happens when the users 
> call the RESTful API directly to get metrics. This means the memory may keep 
> leaking. 
> The leak may be introduced in FLINK-23662 when porting the scala version of 
> AkkaUtils to the java one, while I'm not sure if the scala version has the 
> same issue.
> The leak seems very slow. We observed it on a job running for more than one 
> month with only 1G memory for job manager. So I suppose it's not an emergency 
> one but still needs to fix.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-26180) Update docs to introduce the compaction for FileSink

2022-02-16 Thread Gen Luo (Jira)
Gen Luo created FLINK-26180:
---

 Summary: Update docs to introduce the compaction for FileSink
 Key: FLINK-26180
 URL: https://issues.apache.org/jira/browse/FLINK-26180
 Project: Flink
  Issue Type: Sub-task
Reporter: Gen Luo






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26235) CompactingFileWriter and PendingFileRecoverable should not be exposed to users.

2022-02-17 Thread Gen Luo (Jira)
Gen Luo created FLINK-26235:
---

 Summary: CompactingFileWriter and PendingFileRecoverable should 
not be exposed to users.
 Key: FLINK-26235
 URL: https://issues.apache.org/jira/browse/FLINK-26235
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Reporter: Gen Luo






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26235) CompactingFileWriter and PendingFileRecoverable should not be exposed to users.

2022-02-17 Thread Gen Luo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gen Luo updated FLINK-26235:

Description: In FLINK-25583, we added a FileCompactingWriter to write the 
compacting file. The writer is exposed to users in the FileCompactor, while it 
has a `closeForCommit` function, and relies on the `PendingFileRecoverable`, 
both of which should not be exposed to users. We should fix this and annotate 
them as Internal.

> CompactingFileWriter and PendingFileRecoverable should not be exposed to 
> users.
> ---
>
> Key: FLINK-26235
> URL: https://issues.apache.org/jira/browse/FLINK-26235
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0
>Reporter: Gen Luo
>Assignee: Gen Luo
>Priority: Major
> Fix For: 1.15.0
>
>
> In FLINK-25583, we added a FileCompactingWriter to write the compacting file. 
> The writer is exposed to users in the FileCompactor, while it has a 
> `closeForCommit` function, and relies on the `PendingFileRecoverable`, both 
> of which should not be exposed to users. We should fix this and annotate them 
> as Internal.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26233) FileSinkCompactionSwitchITCase.testSwitchingCompaction() fails in CI

2022-02-18 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17494456#comment-17494456
 ] 

Gen Luo commented on FLINK-26233:
-

This is because when the compaction is switch from on to off, the pending files 
remained in the compactor state will be flushed to the committer at the first 
checkpoint after restarting. So the state of the committer can be larger than 
the stable status, which in this test case is more than 5MB.

I have created a pr to fix this by reducing the speed of the source and using 
the FileSystemCheckpointStorage.

> FileSinkCompactionSwitchITCase.testSwitchingCompaction() fails in CI
> 
>
> Key: FLINK-26233
> URL: https://issues.apache.org/jira/browse/FLINK-26233
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0
>Reporter: Alexander Fedulov
>Priority: Major
>  Labels: pull-request-available, test-stability
>
> {code:java}
> 2022-02-17T20:13:20.2895110Z Feb 17 20:13:20 [INFO] Running 
> org.apache.flink.connector.file.sink.writer.FileSinkMigrationITCase
> 2022-02-17T20:13:40.2160260Z Feb 17 20:13:40 [INFO] Tests run: 2, Failures: 
> 0, Errors: 0, Skipped: 0, Time elapsed: 19.905 s - in 
> org.apache.flink.connector.file.sink.writer.FileSinkMigrationITCase
> 2022-02-17T20:13:58.8860609Z Feb 17 20:13:58 [ERROR] Tests run: 2, Failures: 
> 0, Errors: 1, Skipped: 0, Time elapsed: 102.488 s <<< FAILURE! - in 
> org.apache.flink.connector.file.sink.FileSinkCompactionSwitchITCase
> 2022-02-17T20:13:58.8864562Z Feb 17 20:13:58 [ERROR] 
> FileSinkCompactionSwitchITCase.testSwitchingCompaction  Time elapsed: 37.28 s 
>  <<< ERROR!
> 2022-02-17T20:13:58.8865526Z Feb 17 20:13:58 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2022-02-17T20:13:58.8866319Z Feb 17 20:13:58     at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> 2022-02-17T20:13:58.8867102Z Feb 17 20:13:58     at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:934)
> 2022-02-17T20:13:58.8867985Z Feb 17 20:13:58     at 
> org.apache.flink.connector.file.sink.FileSinkCompactionSwitchITCase.testSwitchingCompaction(FileSinkCompactionSwitchITCase.java:175)
> [...]
> 2022-02-17T20:13:58.8919634Z Feb 17 20:13:58 Caused by: 
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> [...]
> 2022-02-17T20:13:58.8939468Z Feb 17 20:13:58 Caused by: 
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
> failure threshold.
> 2022-02-17T20:13:58.8940119Z Feb 17 20:13:58     at 
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:160)
> 2022-02-17T20:13:58.8940863Z Feb 17 20:13:58     at 
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:145)
> 2022-02-17T20:13:58.8941613Z Feb 17 20:13:58     at 
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:97)
> 2022-02-17T20:13:58.8942321Z Feb 17 20:13:58     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2046)
> 2022-02-17T20:13:58.8943011Z Feb 17 20:13:58     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1040)
> 2022-02-17T20:13:58.8943830Z Feb 17 20:13:58     at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
> 2022-02-17T20:13:58.8944567Z Feb 17 20:13:58     at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
> 2022-02-17T20:13:58.8945240Z Feb 17 20:13:58     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 2022-02-17T20:13:58.8945794Z Feb 17 20:13:58     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2022-02-17T20:13:58.8946276Z Feb 17 20:13:58     at 
> java.lang.Thread.run(Thread.java:748)  {code}
> https://dev.azure.com/alexanderfedulov/Flink/_build/results?buildId=37&view=logs&j=dafbab6d-4616-5d7b-ee37-3c54e4828fd7&t=e204f081-e6cd-5c04-4f4c-919639b63be9&l=2



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26314) StreamingCompactingFileSinkITCase.testFileSink failed on azure

2022-02-23 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17497225#comment-17497225
 ] 

Gen Luo commented on FLINK-26314:
-

I'll take a look soon.

> StreamingCompactingFileSinkITCase.testFileSink failed on azure
> --
>
> Key: FLINK-26314
> URL: https://issues.apache.org/jira/browse/FLINK-26314
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0
>Reporter: Yun Gao
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> Feb 22 13:34:32 [ERROR] Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 12.735 s <<< FAILURE! - in 
> org.apache.flink.connector.file.sink.StreamingCompactingFileSinkITCase
> Feb 22 13:34:32 [ERROR] StreamingCompactingFileSinkITCase.testFileSink  Time 
> elapsed: 3.311 s  <<< FAILURE!
> Feb 22 13:34:32 java.lang.AssertionError: The record 6788 should occur 4 
> times,  but only occurs 3time expected:<4> but was:<3>
> Feb 22 13:34:32   at org.junit.Assert.fail(Assert.java:89)
> Feb 22 13:34:32   at org.junit.Assert.failNotEquals(Assert.java:835)
> Feb 22 13:34:32   at org.junit.Assert.assertEquals(Assert.java:647)
> Feb 22 13:34:32   at 
> org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.checkIntegerSequenceSinkOutput(IntegerFileSinkTestDataUtils.java:155)
> Feb 22 13:34:32   at 
> org.apache.flink.connector.file.sink.FileSinkITBase.testFileSink(FileSinkITBase.java:84)
> Feb 22 13:34:32   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Feb 22 13:34:32   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Feb 22 13:34:32   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Feb 22 13:34:32   at java.lang.reflect.Method.invoke(Method.java:498)
> Feb 22 13:34:32   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Feb 22 13:34:32   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Feb 22 13:34:32   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Feb 22 13:34:32   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Feb 22 13:34:32   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Feb 22 13:34:32   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Feb 22 13:34:32   at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> Feb 22 13:34:32   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Feb 22 13:34:32   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Feb 22 13:34:32   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> Feb 22 13:34:32   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> Feb 22 13:34:32   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> Feb 22 13:34:32   at org.junit.runners.Suite.runChild(Suite.java:128)
> Feb 22 13:34:32   at org.junit.runners.Suite.runChild(Suite.java:27)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>  {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32023&view=logs&j=d44f43ce-542c-597d-bf94-b0718c71e5e8&t=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d&l=11103



--
This message was sent by Atlassian Jira
(v8.2

[jira] [Commented] (FLINK-26314) StreamingCompactingFileSinkITCase.testFileSink failed on azure

2022-02-24 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17497887#comment-17497887
 ] 

Gen Luo commented on FLINK-26314:
-

I found that the PseudoRandomValueSelector is selecting PT0S for 
execution.checkpointing.alignment-timeout and true for 
execution.checkpointing.unaligned for this test. I can reproduce these issues 
with these options. It seems that the compactor for FileSink can not work with 
unaligned checkpoint at present. 

I will first create a PR with a hotfix for the test to disable unaligned 
checkpoint explicitly, then work on a bugfix to make the compactor supporting 
unaligned.

> StreamingCompactingFileSinkITCase.testFileSink failed on azure
> --
>
> Key: FLINK-26314
> URL: https://issues.apache.org/jira/browse/FLINK-26314
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0
>Reporter: Yun Gao
>Assignee: Gen Luo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> Feb 22 13:34:32 [ERROR] Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 12.735 s <<< FAILURE! - in 
> org.apache.flink.connector.file.sink.StreamingCompactingFileSinkITCase
> Feb 22 13:34:32 [ERROR] StreamingCompactingFileSinkITCase.testFileSink  Time 
> elapsed: 3.311 s  <<< FAILURE!
> Feb 22 13:34:32 java.lang.AssertionError: The record 6788 should occur 4 
> times,  but only occurs 3time expected:<4> but was:<3>
> Feb 22 13:34:32   at org.junit.Assert.fail(Assert.java:89)
> Feb 22 13:34:32   at org.junit.Assert.failNotEquals(Assert.java:835)
> Feb 22 13:34:32   at org.junit.Assert.assertEquals(Assert.java:647)
> Feb 22 13:34:32   at 
> org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.checkIntegerSequenceSinkOutput(IntegerFileSinkTestDataUtils.java:155)
> Feb 22 13:34:32   at 
> org.apache.flink.connector.file.sink.FileSinkITBase.testFileSink(FileSinkITBase.java:84)
> Feb 22 13:34:32   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Feb 22 13:34:32   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Feb 22 13:34:32   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Feb 22 13:34:32   at java.lang.reflect.Method.invoke(Method.java:498)
> Feb 22 13:34:32   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Feb 22 13:34:32   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Feb 22 13:34:32   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Feb 22 13:34:32   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Feb 22 13:34:32   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Feb 22 13:34:32   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Feb 22 13:34:32   at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> Feb 22 13:34:32   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Feb 22 13:34:32   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Feb 22 13:34:32   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> Feb 22 13:34:32   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> Feb 22 13:34:32   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> Feb 22 13:34:32   at org.junit.runners.Suite.runChild(Suite.java:128)
> Feb 22 13:34:32   at org.junit.runners.Suite.runChild(Suite.java:27)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> Feb 22 13:34:32   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> Feb 22 13:34:32   at 
> org

[jira] [Created] (FLINK-28240) NettyShuffleMetricFactory#RequestedMemoryUsageMetric#getValue may throw ArithmeticException when the total segments of NetworkBufferPool is 0

2022-06-24 Thread Gen Luo (Jira)
Gen Luo created FLINK-28240:
---

 Summary: 
NettyShuffleMetricFactory#RequestedMemoryUsageMetric#getValue may throw 
ArithmeticException when the total segments of NetworkBufferPool is 0
 Key: FLINK-28240
 URL: https://issues.apache.org/jira/browse/FLINK-28240
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.15.0
Reporter: Gen Luo


In a single vertex job, the network memory can be set to 0 since the job 
doesn't need it, and in this case the totalNumberOfMemorySegments of the 
NetworkBufferPool will also be 0.

While the NettyShuffleMetricFactory#RequestedMemoryUsageMetric#getValue uses 
the totalNumberOfMemorySegments of NetworkBufferPool as divisor without 
validating, so an ArithmeticException will be thrown when the 
totalNumberOfMemorySegments is 0.

Since 0 network memory is in fact valid for a single vertex job, I suppose the 
RequestedMemoryUsageMetric#getValue should check if the devisor is 0, and 
return 0 as the usage directly in such cases.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-26394) CheckpointCoordinator.isTriggering can not be reset if a checkpoint expires while the checkpointCoordinator task is queuing in the SourceCoordinator executor.

2022-08-25 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585177#comment-17585177
 ] 

Gen Luo commented on FLINK-26394:
-

Hi [~gezhiwei8899],
The issue should be existing in any version that has the source coordinator, 
but in only a few cases (i.e. cdc source) the bug will really happen. I'm 
working on picking this fix, along with the bugfix (FLINK-27148) for this fix, 
to 1.13, 1.14 and 1.15, which I also mentioned on the pull request. Would you 
please take a look at the PRs and verify if it can solve your problem?

> CheckpointCoordinator.isTriggering can not be reset if a checkpoint expires 
> while the checkpointCoordinator task is queuing in the SourceCoordinator 
> executor.
> --
>
> Key: FLINK-26394
> URL: https://issues.apache.org/jira/browse/FLINK-26394
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Gen Luo
>Assignee: Gen Luo
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.16.0
>
>
> We found a job can no longer trigger checkpoints or savepoints after 
> recovering from a checkpoint timeout failure. After investigation, we found 
> that the `isTriggering` flag is CheckpointCoordinator is true while no 
> checkpoint is actually doing, and the root cause is as following:
>  
>  # The job uses a source whose coordinator needs to scan a table while 
> requesting splits, which may cost more than 10min. The source coordinator 
> executor thread will be occupied by `handleSplitRequest`, and 
> `checkpointCoordinator` task of the first checkpoint will be queued after it.
>  # 10min later, the checkpoint is expired, removing the pending checkpoint 
> from the coordinator, and triggering a global failover. But the 
> `isTriggering` is not reset here. It can only be reset after the checkpoint 
> completable future is done, which is now holding only by the 
> `checkpointCoordinator` task in the queue, along with the PendingCheckpoint.
>  # Then the job failover, and the RecreateOnResetOperatorCoordinator will 
> recreate a new SourceCoordinator, and close the previous coordinator 
> asynchronously. Timeout for the closing is fixed to 60s. SourceCoordinator 
> will try to `shutdown` the coordinator executor then `awaitTermination`. If 
> the tasks are done within 60s, nothing wrong will happen.
>  # But if the closing method is stuck for more than 60s (which in this case 
> is actually stuck in the `handleSplitRequest`), the async closing thread will 
> be interrupted and SourceCoordinator will `shutdownNow` the executor. All 
> tasks queuing will be discarded, including the `checkpointCoordinator` task.
>  # Then the checkpoint completable future will never complete and the 
> `isTriggering` flag will never be reset.
>  
> I see that the closing part of SourceCoordinator is recently refactored. But 
> I find the new implementation also has this issue. And since it calls 
> `shutdownNow` directly, the issue should be easier to encounter.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26394) CheckpointCoordinator.isTriggering can not be reset if a checkpoint expires while the checkpointCoordinator task is queuing in the SourceCoordinator executor.

2022-08-26 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585199#comment-17585199
 ] 

Gen Luo commented on FLINK-26394:
-

[~gezhiwei8899] ,
It's expected because, though the fix is merged to release-1.15 with 9fc89a05 
on 2022/4/6, it's reverted with 0c718666 on 2022/4/14, and is not added back to 
release-1.15 again. At present the fix (31222b9a) and its fix (2c608b8e) are 
only on the master branch. I'm back porting the changes to release 1.15 now in 
the pull request [20683|https://github.com/apache/flink/pull/20683].

> CheckpointCoordinator.isTriggering can not be reset if a checkpoint expires 
> while the checkpointCoordinator task is queuing in the SourceCoordinator 
> executor.
> --
>
> Key: FLINK-26394
> URL: https://issues.apache.org/jira/browse/FLINK-26394
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Gen Luo
>Assignee: Gen Luo
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.16.0
>
>
> We found a job can no longer trigger checkpoints or savepoints after 
> recovering from a checkpoint timeout failure. After investigation, we found 
> that the `isTriggering` flag is CheckpointCoordinator is true while no 
> checkpoint is actually doing, and the root cause is as following:
>  
>  # The job uses a source whose coordinator needs to scan a table while 
> requesting splits, which may cost more than 10min. The source coordinator 
> executor thread will be occupied by `handleSplitRequest`, and 
> `checkpointCoordinator` task of the first checkpoint will be queued after it.
>  # 10min later, the checkpoint is expired, removing the pending checkpoint 
> from the coordinator, and triggering a global failover. But the 
> `isTriggering` is not reset here. It can only be reset after the checkpoint 
> completable future is done, which is now holding only by the 
> `checkpointCoordinator` task in the queue, along with the PendingCheckpoint.
>  # Then the job failover, and the RecreateOnResetOperatorCoordinator will 
> recreate a new SourceCoordinator, and close the previous coordinator 
> asynchronously. Timeout for the closing is fixed to 60s. SourceCoordinator 
> will try to `shutdown` the coordinator executor then `awaitTermination`. If 
> the tasks are done within 60s, nothing wrong will happen.
>  # But if the closing method is stuck for more than 60s (which in this case 
> is actually stuck in the `handleSplitRequest`), the async closing thread will 
> be interrupted and SourceCoordinator will `shutdownNow` the executor. All 
> tasks queuing will be discarded, including the `checkpointCoordinator` task.
>  # Then the checkpoint completable future will never complete and the 
> `isTriggering` flag will never be reset.
>  
> I see that the closing part of SourceCoordinator is recently refactored. But 
> I find the new implementation also has this issue. And since it calls 
> `shutdownNow` directly, the issue should be easier to encounter.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29132) SubtaskMetricStore causes memory leak.

2022-08-31 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598314#comment-17598314
 ] 

Gen Luo commented on FLINK-29132:
-

[~nagist] 
Thanks a lot for reporting the issue! 
And thanks [~chesnay] and [~mayuehappy] for taking care of it! 
I'll fix the issue as soon as possible. 

> SubtaskMetricStore causes memory leak.
> --
>
> Key: FLINK-29132
> URL: https://issues.apache.org/jira/browse/FLINK-29132
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Reporter: Sitan Pang
>Priority: Blocker
> Attachments: dump-attempts.png, dump-metricStore.png, 
> image-2022-08-30-11-56-34-608.png, image-2022-08-30-11-57-59-325.png
>
>
> In [FLINK-28588], MetricStore supports multiple attempts of a subtask. 
> However, `SubtaskMetricStore` doesn't have a clean mechanism.  In failover 
> scenario, `attempts` pile up and cause OOM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27148) UnalignedCheckpointITCase fails on AZP

2022-04-13 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17521505#comment-17521505
 ] 

Gen Luo commented on FLINK-27148:
-

I'm working on the issue now. The solution seems to be simple, but I'm thinking 
about how to verify the change with an UT, which may take some time.

> UnalignedCheckpointITCase fails on AZP
> --
>
> Key: FLINK-27148
> URL: https://issues.apache.org/jira/browse/FLINK-27148
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Roman Khachatryan
>Priority: Blocker
> Fix For: 1.16.0
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34394&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba&l=5812]
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34394&view=logs&j=baf26b34-3c6a-54e8-f93f-cf269b32f802&t=8c9d126d-57d2-5a9e-a8c8-ff53f7b35cd9&l=6018]
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34448&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7&l=41655]
> Relevant error message:
>  
> {code:java}
> Caused by: java.lang.IllegalStateException: Cannot mark for checkpoint 12, 
> already marked for checkpoint 11
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorEventValve.markForCheckpoint(OperatorEventValve.java:113)
>  {code}
>  
> {code:java}
> [ERROR] Tests run: 22, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 
> 174.732 s <<< FAILURE! - in 
> org.apache.flink.test.checkpointing.UnalignedCheckpointITCase
> [ERROR] UnalignedCheckpointITCase.execute  Time elapsed: 6.408 s  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>   at 
> org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase.execute(UnalignedCheckpointTestBase.java:184)
>   at 
> org.apache.flink.test.checkpointing.UnalignedCheckpointITCase.execute(UnalignedCheckpointITCase.java:287)
>   at sun.reflect.GeneratedMethodAccessor90.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.Verifier$1.evaluate(Verifier.java:35)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at org.junit.runners.Suite.runChild(Suite.java:128)
>   at org.junit.runners.Suite.runChild(Suite.java:27)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 

[jira] [Commented] (FLINK-31655) Adaptive Channel selection for partitioner

2023-04-13 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17711755#comment-17711755
 ] 

Gen Luo commented on FLINK-31655:
-

Hi [~akalash], thanks for the remind. 
I agree that the impaction to the normal rebalance must be measured. Though I 
wonder why the implementation in the pull request could impact significantly. 
Hopefully we can implement this time in a more controllable way and make sure 
the impaction is acceptable.

On the other hand, in my opinion, if the adaptive rebalance does little 
regression to the performance, maybe we can make it 'suggested' or even 
'default', unless the users need all subtasks process exactly the same amount 
of records. 

Hi [~tartarus], 
Thanks for the doc! As we discussed offline, we'd better make the issue a FLIP 
and raise a formal discussion in the mailing list. As others mentioned, the 
feature was discussed before and faced quite some problems. I think we need a 
formal proposal with some implementation plans, then provide some benchmark 
results with a POC version, which include the gain of performance in applicable 
scenes, and the impaction to normal rebalance.

> Adaptive Channel selection for partitioner
> --
>
> Key: FLINK-31655
> URL: https://issues.apache.org/jira/browse/FLINK-31655
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: tartarus
>Assignee: tartarus
>Priority: Major
>
> In Flink, if the upstream and downstream operator parallelism is not the 
> same, then by default the RebalancePartitioner will be used to select the 
> target channel.
> In our company, users often use flink to access redis, hbase or other rpc 
> services, If some of the Operators are slow to return requests (for external 
> service reasons), then because Rebalance/Rescale are Round-Robin the Channel 
> selection policy, so the job is easy to backpressure.
> Because the Rebalance/Rescale policy does not care which subtask the data is 
> sent to downstream, so we expect Rebalance/Rescale to refer to the processing 
> power of the downstream subtask when choosing a Channel.
> Send more data to the free subtask, this ensures the best possible throughput 
> of job!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-26394) CheckpointCoordinator.isTriggering can not be reset if a checkpoint expires while the checkpointCoordinator task is queuing in the SourceCoordinator executor.

2022-02-28 Thread Gen Luo (Jira)
Gen Luo created FLINK-26394:
---

 Summary: CheckpointCoordinator.isTriggering can not be reset if a 
checkpoint expires while the checkpointCoordinator task is queuing in the 
SourceCoordinator executor.
 Key: FLINK-26394
 URL: https://issues.apache.org/jira/browse/FLINK-26394
 Project: Flink
  Issue Type: Bug
Reporter: Gen Luo


We found a job can no longer trigger checkpoints or savepoints after recovering 
from a checkpoint timeout failure. After investigation, we found that the 
`isTriggering` flag is CheckpointCoordinator is true while no checkpoint is 
actually doing, and the root cause is as following:

 
 # The job uses a source whose coordinator needs to scan a table while 
requesting splits, which may cost more than 10min. The source coordinator 
executor thread will be occupied by `handleSplitRequest`, and 
`checkpointCoordinator` task of the first checkpoint will be queued after it.
 # 10min later, the checkpoint is expired, removing the pending checkpoint from 
the coordinator, and triggering a global failover. But the `isTriggering` is 
not reset here. It can only be reset after the checkpoint completable future is 
done, which is now holding only by the `checkpointCoordinator` task in the 
queue, along with the PendingCheckpoint.
 # Then the job failover, and the RecreateOnResetOperatorCoordinator will 
recreate a new SourceCoordinator, and close the previous coordinator 
asynchronously. Timeout for the closing is fixed to 60s. SourceCoordinator will 
try to `shutdown` the coordinator executor then `awaitTermination`. If the 
tasks are done within 60s, nothing wrong will happen.
 # But if the closing method is stuck for more than 60s (which in this case is 
actually stuck in the `handleSplitRequest`), the async closing thread will be 
interrupted and SourceCoordinator will `shutdownNow` the executor. All tasks 
queuing will be discarded, including the `checkpointCoordinator` task.
 # Then the checkpoint completable future will never complete and the 
`isTriggering` flag will never be reset.

 

I see that the closing part of SourceCoordinator is recently refactored. But I 
find the new implementation also has this issue. And since it calls 
`shutdownNow` directly, the issue should be easier to encounter.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26394) CheckpointCoordinator.isTriggering can not be reset if a checkpoint expires while the checkpointCoordinator task is queuing in the SourceCoordinator executor.

2022-03-01 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17499474#comment-17499474
 ] 

Gen Luo commented on FLINK-26394:
-

[~yunta] The problem can be reproduced by:
 # add a 10s sleeping (or 120s for the elder version) in the RequestSplitEvent 
processing branch in SourceCoordinator.handleEventFromOperator. This is 
imitating the behavior that enumerator.handleSplitRequest takes too long.
 # set the checkpoint timeout to 2s for 
FileSourceTextLinesITCase.testContinuousTextFileSource
 # run the test FileSourceTextLinesITCase.testContinuousTextFileSource (with 
FailoverType=NONE)

 

> CheckpointCoordinator.isTriggering can not be reset if a checkpoint expires 
> while the checkpointCoordinator task is queuing in the SourceCoordinator 
> executor.
> --
>
> Key: FLINK-26394
> URL: https://issues.apache.org/jira/browse/FLINK-26394
> Project: Flink
>  Issue Type: Bug
>Reporter: Gen Luo
>Priority: Major
>
> We found a job can no longer trigger checkpoints or savepoints after 
> recovering from a checkpoint timeout failure. After investigation, we found 
> that the `isTriggering` flag is CheckpointCoordinator is true while no 
> checkpoint is actually doing, and the root cause is as following:
>  
>  # The job uses a source whose coordinator needs to scan a table while 
> requesting splits, which may cost more than 10min. The source coordinator 
> executor thread will be occupied by `handleSplitRequest`, and 
> `checkpointCoordinator` task of the first checkpoint will be queued after it.
>  # 10min later, the checkpoint is expired, removing the pending checkpoint 
> from the coordinator, and triggering a global failover. But the 
> `isTriggering` is not reset here. It can only be reset after the checkpoint 
> completable future is done, which is now holding only by the 
> `checkpointCoordinator` task in the queue, along with the PendingCheckpoint.
>  # Then the job failover, and the RecreateOnResetOperatorCoordinator will 
> recreate a new SourceCoordinator, and close the previous coordinator 
> asynchronously. Timeout for the closing is fixed to 60s. SourceCoordinator 
> will try to `shutdown` the coordinator executor then `awaitTermination`. If 
> the tasks are done within 60s, nothing wrong will happen.
>  # But if the closing method is stuck for more than 60s (which in this case 
> is actually stuck in the `handleSplitRequest`), the async closing thread will 
> be interrupted and SourceCoordinator will `shutdownNow` the executor. All 
> tasks queuing will be discarded, including the `checkpointCoordinator` task.
>  # Then the checkpoint completable future will never complete and the 
> `isTriggering` flag will never be reset.
>  
> I see that the closing part of SourceCoordinator is recently refactored. But 
> I find the new implementation also has this issue. And since it calls 
> `shutdownNow` directly, the issue should be easier to encounter.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-26394) CheckpointCoordinator.isTriggering can not be reset if a checkpoint expires while the checkpointCoordinator task is queuing in the SourceCoordinator executor.

2022-03-01 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17499474#comment-17499474
 ] 

Gen Luo edited comment on FLINK-26394 at 3/1/22, 11:29 AM:
---

[~yunta] The problem can be reproduced by:
 # add a 10s sleeping (or 120s for the older version) in the RequestSplitEvent 
processing branch in SourceCoordinator.handleEventFromOperator. This is 
imitating the behavior that enumerator.handleSplitRequest takes too long.
 # set the checkpoint timeout to 2s for 
FileSourceTextLinesITCase.testContinuousTextFileSource
 # run the test FileSourceTextLinesITCase.testContinuousTextFileSource (with 
FailoverType=NONE)

 


was (Author: pltbkd):
[~yunta] The problem can be reproduced by:
 # add a 10s sleeping (or 120s for the elder version) in the RequestSplitEvent 
processing branch in SourceCoordinator.handleEventFromOperator. This is 
imitating the behavior that enumerator.handleSplitRequest takes too long.
 # set the checkpoint timeout to 2s for 
FileSourceTextLinesITCase.testContinuousTextFileSource
 # run the test FileSourceTextLinesITCase.testContinuousTextFileSource (with 
FailoverType=NONE)

 

> CheckpointCoordinator.isTriggering can not be reset if a checkpoint expires 
> while the checkpointCoordinator task is queuing in the SourceCoordinator 
> executor.
> --
>
> Key: FLINK-26394
> URL: https://issues.apache.org/jira/browse/FLINK-26394
> Project: Flink
>  Issue Type: Bug
>Reporter: Gen Luo
>Priority: Major
>
> We found a job can no longer trigger checkpoints or savepoints after 
> recovering from a checkpoint timeout failure. After investigation, we found 
> that the `isTriggering` flag is CheckpointCoordinator is true while no 
> checkpoint is actually doing, and the root cause is as following:
>  
>  # The job uses a source whose coordinator needs to scan a table while 
> requesting splits, which may cost more than 10min. The source coordinator 
> executor thread will be occupied by `handleSplitRequest`, and 
> `checkpointCoordinator` task of the first checkpoint will be queued after it.
>  # 10min later, the checkpoint is expired, removing the pending checkpoint 
> from the coordinator, and triggering a global failover. But the 
> `isTriggering` is not reset here. It can only be reset after the checkpoint 
> completable future is done, which is now holding only by the 
> `checkpointCoordinator` task in the queue, along with the PendingCheckpoint.
>  # Then the job failover, and the RecreateOnResetOperatorCoordinator will 
> recreate a new SourceCoordinator, and close the previous coordinator 
> asynchronously. Timeout for the closing is fixed to 60s. SourceCoordinator 
> will try to `shutdown` the coordinator executor then `awaitTermination`. If 
> the tasks are done within 60s, nothing wrong will happen.
>  # But if the closing method is stuck for more than 60s (which in this case 
> is actually stuck in the `handleSplitRequest`), the async closing thread will 
> be interrupted and SourceCoordinator will `shutdownNow` the executor. All 
> tasks queuing will be discarded, including the `checkpointCoordinator` task.
>  # Then the checkpoint completable future will never complete and the 
> `isTriggering` flag will never be reset.
>  
> I see that the closing part of SourceCoordinator is recently refactored. But 
> I find the new implementation also has this issue. And since it calls 
> `shutdownNow` directly, the issue should be easier to encounter.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26440) CompactorOperatorStateHandler can not work with unaligned checkpoint

2022-03-01 Thread Gen Luo (Jira)
Gen Luo created FLINK-26440:
---

 Summary: CompactorOperatorStateHandler can not work with unaligned 
checkpoint
 Key: FLINK-26440
 URL: https://issues.apache.org/jira/browse/FLINK-26440
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Reporter: Gen Luo


As mentioned in FLINK-26314, CompactorOperatorStateHandler can not work with 
unaligned checkpoint currently. Though FLINK-26314 is actually caused by 
another issue in the writer, we should still fix this issue.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26322) Test FileSink compaction manually

2022-03-09 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17503975#comment-17503975
 ] 

Gen Luo commented on FLINK-26322:
-

Thanks for the testing, [~alexanderpreuss]!

3 has been addressed. It's a issue in CompactCoordinatorStateHandler that it 
doesn't properly handle the cleanup-in-progress requests remaining in the state 
but submitted them as compacting requests. I'll fix as soon as possible.

> Test FileSink compaction manually
> -
>
> Key: FLINK-26322
> URL: https://issues.apache.org/jira/browse/FLINK-26322
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0
>Reporter: Yun Gao
>Assignee: Alexander Preuss
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.15.0
>
>
> Documentation of compaction on FileSink: 
> [https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#compaction]
> Possible scenarios might include
>  # Enable compaction with file-size based compaction strategy.
>  # Enable compaction with number-checkpoints based compaction strategy.
>  # Enable compaction, stop-with-savepoint and restarted with compaction 
> disabled.
>  # Disable compaction, stop-with-savepoint and restarted with compaction 
> enabled.
> For each scenario, it might need to verify that
>  # No repeat and missed records.
>  # The resulted files' size exceeds the specified condition.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26564) CompactCoordinatorStateHandler doesn't properly handle the cleanup-in-progress requests.

2022-03-09 Thread Gen Luo (Jira)
Gen Luo created FLINK-26564:
---

 Summary: CompactCoordinatorStateHandler doesn't properly handle 
the cleanup-in-progress requests.
 Key: FLINK-26564
 URL: https://issues.apache.org/jira/browse/FLINK-26564
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.15.0
Reporter: Gen Luo
 Fix For: 1.15.0


It is found in FLINK-26322 that the CompactCoordinatorStateHandler doesn't 
properly handle the cleanup-in-progress requests but submit them as compacting 
requests. The issue happens when a job with compaction enabled is 
stop-with-savepoint and restarted with compaction disabled.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26580) FileSink CompactCoordinator add illegal committable as toCompacted.

2022-03-10 Thread Gen Luo (Jira)
Gen Luo created FLINK-26580:
---

 Summary: FileSink CompactCoordinator add illegal committable as 
toCompacted.
 Key: FLINK-26580
 URL: https://issues.apache.org/jira/browse/FLINK-26580
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.15.0
Reporter: Gen Luo
 Fix For: 1.15.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26580) FileSink Compactor is not properly processing in-progress files.

2022-03-10 Thread Gen Luo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gen Luo updated FLINK-26580:

Summary: FileSink Compactor is not properly processing in-progress files.  
(was: FileSink CompactCoordinator add illegal committable as toCompacted.)

> FileSink Compactor is not properly processing in-progress files.
> 
>
> Key: FLINK-26580
> URL: https://issues.apache.org/jira/browse/FLINK-26580
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0
>Reporter: Gen Luo
>Assignee: Gen Luo
>Priority: Blocker
> Fix For: 1.15.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26580) FileSink Compactor state handlers are not properly processing remaining in-progress files.

2022-03-10 Thread Gen Luo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gen Luo updated FLINK-26580:

Summary: FileSink Compactor state handlers are not properly processing 
remaining in-progress files.  (was: FileSink Compactor is not properly 
processing in-progress files.)

> FileSink Compactor state handlers are not properly processing remaining 
> in-progress files.
> --
>
> Key: FLINK-26580
> URL: https://issues.apache.org/jira/browse/FLINK-26580
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0
>Reporter: Gen Luo
>Assignee: Gen Luo
>Priority: Blocker
> Fix For: 1.15.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26580) FileSink Compactor is not properly processing in-progress files.

2022-03-10 Thread Gen Luo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gen Luo updated FLINK-26580:

Summary: FileSink Compactor is not properly processing in-progress files.  
(was: FileSink Compactor state handlers are not properly processing remaining 
in-progress files.)

> FileSink Compactor is not properly processing in-progress files.
> 
>
> Key: FLINK-26580
> URL: https://issues.apache.org/jira/browse/FLINK-26580
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0
>Reporter: Gen Luo
>Assignee: Gen Luo
>Priority: Blocker
> Fix For: 1.15.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26322) Test FileSink compaction manually

2022-03-10 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17504708#comment-17504708
 ] 

Gen Luo commented on FLINK-26322:
-

I found issues that may relate to 4. The jira is FLINK-26580 and a PR has been 
created.

It seems that the compactor is not properly processing the in-progress files, 
but we failed to find it since the test case is using a rolling policy that 
will force flush the in-progress file before checkpointing. I changed this 
policy and two issues arose, both of which are fixed in the newly created PR. 

> Test FileSink compaction manually
> -
>
> Key: FLINK-26322
> URL: https://issues.apache.org/jira/browse/FLINK-26322
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0
>Reporter: Yun Gao
>Assignee: Alexander Preuss
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.15.0
>
>
> Documentation of compaction on FileSink: 
> [https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#compaction]
> Possible scenarios might include
>  # Enable compaction with file-size based compaction strategy.
>  # Enable compaction with number-checkpoints based compaction strategy.
>  # Enable compaction, stop-with-savepoint and restarted with compaction 
> disabled.
>  # Disable compaction, stop-with-savepoint and restarted with compaction 
> enabled.
> For each scenario, it might need to verify that
>  # No repeat and missed records.
>  # The resulted files' size exceeds the specified condition.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26610) FileSink can not upgrade from 1.13 if the uid of the origin sink is not set.

2022-03-11 Thread Gen Luo (Jira)
Gen Luo created FLINK-26610:
---

 Summary: FileSink can not upgrade from 1.13 if the uid of the 
origin sink is not set.
 Key: FLINK-26610
 URL: https://issues.apache.org/jira/browse/FLINK-26610
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.15.0
Reporter: Gen Luo
 Fix For: 1.15.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26322) Test FileSink compaction manually

2022-03-13 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17506013#comment-17506013
 ] 

Gen Luo commented on FLINK-26322:
-

Now all issues known are resolved, and the patches are merged.

I have tried the tests with the latest version and everything seems ok now. 
Thanks [~alexanderpreuss] again for the effort which helps us to find and 
resolve the issues!

> Test FileSink compaction manually
> -
>
> Key: FLINK-26322
> URL: https://issues.apache.org/jira/browse/FLINK-26322
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0
>Reporter: Yun Gao
>Assignee: Alexander Preuss
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.15.0
>
>
> Documentation of compaction on FileSink: 
> [https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#compaction]
> Possible scenarios might include
>  # Enable compaction with file-size based compaction strategy.
>  # Enable compaction with number-checkpoints based compaction strategy.
>  # Enable compaction, stop-with-savepoint and restarted with compaction 
> disabled.
>  # Disable compaction, stop-with-savepoint and restarted with compaction 
> enabled.
> For each scenario, it might need to verify that
>  # No repeat and missed records.
>  # The resulted files' size exceeds the specified condition.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26394) CheckpointCoordinator.isTriggering can not be reset if a checkpoint expires while the checkpointCoordinator task is queuing in the SourceCoordinator executor.

2022-03-14 Thread Gen Luo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17506047#comment-17506047
 ] 

Gen Luo commented on FLINK-26394:
-

I agree with you, [~becket_qin]. 

IMO CheckpointCoordinator should always hold the ownership of the futures and 
handle the exceptions as the last resort, since the implementations of 
OperatorCoordinator can be various.

While since the SourceCoordinator is under control, and the problem may also 
influence the registered futures from other modules to the SourceCoordinator, I 
think the SourceCoordinator should also handle the futures in such cases.

> CheckpointCoordinator.isTriggering can not be reset if a checkpoint expires 
> while the checkpointCoordinator task is queuing in the SourceCoordinator 
> executor.
> --
>
> Key: FLINK-26394
> URL: https://issues.apache.org/jira/browse/FLINK-26394
> Project: Flink
>  Issue Type: Bug
>Reporter: Gen Luo
>Priority: Major
>  Labels: pull-request-available
>
> We found a job can no longer trigger checkpoints or savepoints after 
> recovering from a checkpoint timeout failure. After investigation, we found 
> that the `isTriggering` flag is CheckpointCoordinator is true while no 
> checkpoint is actually doing, and the root cause is as following:
>  
>  # The job uses a source whose coordinator needs to scan a table while 
> requesting splits, which may cost more than 10min. The source coordinator 
> executor thread will be occupied by `handleSplitRequest`, and 
> `checkpointCoordinator` task of the first checkpoint will be queued after it.
>  # 10min later, the checkpoint is expired, removing the pending checkpoint 
> from the coordinator, and triggering a global failover. But the 
> `isTriggering` is not reset here. It can only be reset after the checkpoint 
> completable future is done, which is now holding only by the 
> `checkpointCoordinator` task in the queue, along with the PendingCheckpoint.
>  # Then the job failover, and the RecreateOnResetOperatorCoordinator will 
> recreate a new SourceCoordinator, and close the previous coordinator 
> asynchronously. Timeout for the closing is fixed to 60s. SourceCoordinator 
> will try to `shutdown` the coordinator executor then `awaitTermination`. If 
> the tasks are done within 60s, nothing wrong will happen.
>  # But if the closing method is stuck for more than 60s (which in this case 
> is actually stuck in the `handleSplitRequest`), the async closing thread will 
> be interrupted and SourceCoordinator will `shutdownNow` the executor. All 
> tasks queuing will be discarded, including the `checkpointCoordinator` task.
>  # Then the checkpoint completable future will never complete and the 
> `isTriggering` flag will never be reset.
>  
> I see that the closing part of SourceCoordinator is recently refactored. But 
> I find the new implementation also has this issue. And since it calls 
> `shutdownNow` directly, the issue should be easier to encounter.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)