[ https://issues.apache.org/jira/browse/FLINK-33121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Panagiotis Garefalakis updated FLINK-33121: ------------------------------------------- Description: {{JobExceptionsHandler#createRootExceptionInfo}} makes the assumption that *Global* Failures (with null Task name) may *only* be RootExceptions (jobs are considered in FAILED state when this happens and no further exceptions are captured) and *Local/Task* may be part of concurrent exceptions List *--* if this precondition is violated, an assertion is thrown as part of {{{}asserLocalExceptionInfo{}}}. The issue lies within [convertFailures](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java#L422) logic where we take the failureCollection pointer and convert it to a HistoryEntry. In more detail, we are passing the first Failure and a pointer to the remaining failures collection as part of HistoryEntry creation — and then add the entry in the exception History. In our specific scenario a Local Failure first comes in, we call convertFailures that creates a HistoryEntry and removes the LocalFailure from the collection while also passing a pointer to the empty failureCollection. Then a Global failure comes in (and before conversion), it is added to the failureCollection (that was empty) just before serving the requestJob that returns the List of History Entries. This messes things up, as the LocalFailure now has a ConcurrentExceptionsCollection with a Global Failure that should never happen (causing the assertion). A solution is to create a Copy of the failureCollection in the conversion instead of passing the pointer around (as I did in the updated PR) This PR also fixes a smaller bug where we dont pass the [taskName](https://github.com/apache/flink/pull/23440/files#diff-0c8b850bbd267631fbe04bb44d8bb3c7e87c3c6aabae904fabdb758026f7fa76R104) properly. Note: DefaultScheduler does not suffer from this issue as it treats failures directly as HistoryEntries (no conversion step) was: {{JobExceptionsHandler#createRootExceptionInfo}} makes the assumption that *Global* Failures (with null Task name) may *only* be RootExceptions (jobs are considered in FAILED state when this happens and no further exceptions are captured) and *Local/Task* may be part of concurrent exceptions List *--* if this precondition is violated, an assertion is thrown as part of {{{}asserLocalExceptionInfo{}}}. However, in the existing logic in the AdaptiveScheduler, we always add both the Global and the Local failures at the *end* of the [failure collection list|https://github.com/confluentinc/flink/blob/b8482260622c14db00f9dc88bbf9e82233613235/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java#L338] and when converting them to history entries, we *remove from the Head* the [oldest failure exception.|https://github.com/confluentinc/flink/blob/b8482260622c14db00f9dc88bbf9e82233613235/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java#L386] As a result, when there is a concurrent Task failure (first) with a Global failure (second terminating the job), the global failure ends up in the concurrent exception list, violating the precondition. Note: DefaultScheduler does not suffer from this issue as it treats failures directly as HistoryEntries (no conversion step) Solution is to only add Global failures in the *head* of the List as part of handleGlobalFailure method to ensure they are ending up as RootExceptionEntries. > Failed precondition in JobExceptionsHandler due to concurrent global failures > ----------------------------------------------------------------------------- > > Key: FLINK-33121 > URL: https://issues.apache.org/jira/browse/FLINK-33121 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Reporter: Panagiotis Garefalakis > Priority: Major > Labels: pull-request-available > > {{JobExceptionsHandler#createRootExceptionInfo}} makes the assumption that > *Global* Failures (with null Task name) may *only* be RootExceptions (jobs > are considered in FAILED state when this happens and no further exceptions > are captured) and *Local/Task* may be part of concurrent exceptions List *--* > if this precondition is violated, an assertion is thrown as part of > {{{}asserLocalExceptionInfo{}}}. > The issue lies within > [convertFailures](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java#L422) > logic where we take the failureCollection pointer and convert it to a > HistoryEntry. > In more detail, we are passing the first Failure and a pointer to the > remaining failures collection as part of HistoryEntry creation — and then add > the entry in the exception History. > In our specific scenario a Local Failure first comes in, we call > convertFailures that creates a HistoryEntry and removes the LocalFailure from > the collection while also passing a pointer to the empty failureCollection. > Then a Global failure comes in (and before conversion), it is added to the > failureCollection (that was empty) just before serving the requestJob that > returns the List of History Entries. > This messes things up, as the LocalFailure now has a > ConcurrentExceptionsCollection with a Global Failure that should never happen > (causing the assertion). > A solution is to create a Copy of the failureCollection in the conversion > instead of passing the pointer around (as I did in the updated PR) > This PR also fixes a smaller bug where we dont pass the > [taskName](https://github.com/apache/flink/pull/23440/files#diff-0c8b850bbd267631fbe04bb44d8bb3c7e87c3c6aabae904fabdb758026f7fa76R104) > properly. > Note: DefaultScheduler does not suffer from this issue as it treats failures > directly as HistoryEntries (no conversion step) -- This message was sent by Atlassian Jira (v8.20.10#820010)