(flink) 02/02: [FLINK-34922] Adds ITCase for GlobalFailureOnRestart

2024-03-28 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 20c506d76c99c2e6c3f30a039acd0366d3448c87
Author: Panagiotis Garefalakis 
AuthorDate: Wed Mar 27 22:23:48 2024 -0700

[FLINK-34922] Adds ITCase for GlobalFailureOnRestart

Add an ITCase where a global failure is triggered while the scheduler is 
restarting, and asserts that
this failure is handled such that can be retrieved via the REST API.
---
 .../test/scheduling/AdaptiveSchedulerITCase.java   | 197 +++--
 1 file changed, 183 insertions(+), 14 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
index a4124dfe08c..d15f3ae7ef4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.test.scheduling;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
@@ -30,9 +33,20 @@ import 
org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
 import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
+import 
org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory.RootExceptionInfo;
 import 
org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
 import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -48,8 +62,10 @@ import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -62,11 +78,13 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
 import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
 import static org.apache.flink.util.ExceptionUtils.assertThrowable;
 import static org.hamcrest.CoreMatchers.containsString;
@@ -266,25 +284,78 @@ public class AdaptiveSchedulerITCase extends TestLogger {
 final JobClient jobClient = env.executeAsync();
 CommonTestUtils.waitUntilCondition(
 () -> {
-final RestClusterClient restClusterClient =
-
MINI_CLUSTER_WITH_CLIENT_RESOURCE.getRestClusterClient();
-final JobExceptionsMessageParameters params =
-new JobExceptionsMessageParameters();
-params.jobPathParameter.resolve(jobClient.getJobID());
-final CompletableFuture 
exceptionsFuture =
-restClusterClient.sendRequest(
- 

(flink) 02/02: [FLINK-34922] Adds ITCase for GlobalFailureOnRestart

2024-03-28 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b54edc886ce5a533bafe74fa3629657b6266cad5
Author: Panagiotis Garefalakis 
AuthorDate: Wed Mar 27 22:23:48 2024 -0700

[FLINK-34922] Adds ITCase for GlobalFailureOnRestart

Add an ITCase where a global failure is triggered while the scheduler is 
restarting, and asserts that
this failure is handled such that can be retrieved via the REST API.
---
 .../test/scheduling/AdaptiveSchedulerITCase.java   | 197 +++--
 1 file changed, 183 insertions(+), 14 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
index 0cbfdd950bd..e63b2891039 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.test.scheduling;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
@@ -30,9 +33,20 @@ import 
org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
 import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
+import 
org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory.RootExceptionInfo;
 import 
org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
 import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -48,8 +62,10 @@ import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -62,11 +78,13 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
 import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
 import static org.apache.flink.util.ExceptionUtils.assertThrowable;
 import static org.hamcrest.CoreMatchers.containsString;
@@ -270,25 +288,78 @@ public class AdaptiveSchedulerITCase extends TestLogger {
 final JobClient jobClient = env.executeAsync();
 CommonTestUtils.waitUntilCondition(
 () -> {
-final RestClusterClient restClusterClient =
-
MINI_CLUSTER_WITH_CLIENT_RESOURCE.getRestClusterClient();
-final JobExceptionsMessageParameters params =
-new JobExceptionsMessageParameters();
-params.jobPathParameter.resolve(jobClient.getJobID());
-final CompletableFuture 
exceptionsFuture =
-restClusterClient.sendRequest(
- 

(flink) 02/02: [FLINK-34922] Adds ITCase for GlobalFailureOnRestart

2024-03-28 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f4c945cb9ca882ae485c2e58c74825938f154119
Author: Panagiotis Garefalakis 
AuthorDate: Wed Mar 27 22:23:48 2024 -0700

[FLINK-34922] Adds ITCase for GlobalFailureOnRestart

Add an ITCase where a global failure is triggered while the scheduler is 
restarting, and asserts that
this failure is handled such that can be retrieved via the REST API.
---
 .../test/scheduling/AdaptiveSchedulerITCase.java   | 197 +++--
 1 file changed, 183 insertions(+), 14 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
index 2836ff82371..f6c31658f7d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.test.scheduling;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
@@ -31,9 +34,20 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.execution.CheckpointingMode;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
 import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
+import 
org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory.RootExceptionInfo;
 import 
org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
 import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -48,8 +62,10 @@ import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -62,11 +78,13 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
 import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
 import static org.apache.flink.util.ExceptionUtils.assertThrowable;
 import static org.hamcrest.CoreMatchers.containsString;
@@ -270,25 +288,78 @@ public class AdaptiveSchedulerITCase extends TestLogger {
 final JobClient jobClient = env.executeAsync();
 CommonTestUtils.waitUntilCondition(
 () -> {
-final RestClusterClient restClusterClient =
-
MINI_CLUSTER_WITH_CLIENT_RESOURCE.getRestClusterClient();
-final JobExceptionsMessageParameters params =
-new JobExceptionsMessageParameters();
-params.jobPathParameter.resolve(jobClient.getJobID());
-final CompletableFuture 
exceptionsFuture =
-restClusterClient.sendRequest(
-