This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5954b5b579c [FLINK-38757][runtime] Introduce the base class for
application
5954b5b579c is described below
commit 5954b5b579c6d4bad779d20b152eb6147b098ac6
Author: Yi Zhang <[email protected]>
AuthorDate: Wed Sep 24 11:08:27 2025 +0800
[FLINK-38757][runtime] Introduce the base class for application
---
.../org/apache/flink/api/common/ApplicationID.java | 110 ++++++++++
.../apache/flink/api/common/ApplicationState.java | 63 ++++++
.../runtime/application/AbstractApplication.java | 211 ++++++++++++++++++
.../application/AbstractApplicationTest.java | 240 +++++++++++++++++++++
4 files changed, 624 insertions(+)
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/ApplicationID.java
b/flink-core/src/main/java/org/apache/flink/api/common/ApplicationID.java
new file mode 100644
index 00000000000..c7b0c8b7c54
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ApplicationID.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.StringUtils;
+
+import java.nio.ByteBuffer;
+
+/** Unique (at least statistically unique) identifier for a Flink Application.
*/
+@PublicEvolving
+public final class ApplicationID extends AbstractID {
+
+ private static final long serialVersionUID = 1L;
+
+ /** Creates a new (statistically) random ApplicationID. */
+ public ApplicationID() {
+ super();
+ }
+
+ /**
+ * Creates a new ApplicationID, using the given lower and upper parts.
+ *
+ * @param lowerPart The lower 8 bytes of the ID.
+ * @param upperPart The upper 8 bytes of the ID.
+ */
+ public ApplicationID(long lowerPart, long upperPart) {
+ super(lowerPart, upperPart);
+ }
+
+ /**
+ * Creates a new ApplicationID from the given byte sequence. The byte
sequence must be exactly
+ * 16 bytes long. The first eight bytes make up the lower part of the ID,
while the next 8 bytes
+ * make up the upper part of the ID.
+ *
+ * @param bytes The byte sequence.
+ */
+ public ApplicationID(byte[] bytes) {
+ super(bytes);
+ }
+
+ // ------------------------------------------------------------------------
+ // Static factory methods
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new (statistically) random ApplicationID.
+ *
+ * @return A new random ApplicationID.
+ */
+ public static ApplicationID generate() {
+ return new ApplicationID();
+ }
+
+ /**
+ * Creates a new ApplicationID from the given byte sequence. The byte
sequence must be exactly
+ * 16 bytes long. The first eight bytes make up the lower part of the ID,
while the next 8 bytes
+ * make up the upper part of the ID.
+ *
+ * @param bytes The byte sequence.
+ * @return A new ApplicationID corresponding to the ID encoded in the
bytes.
+ */
+ public static ApplicationID fromByteArray(byte[] bytes) {
+ return new ApplicationID(bytes);
+ }
+
+ public static ApplicationID fromByteBuffer(ByteBuffer buf) {
+ long lower = buf.getLong();
+ long upper = buf.getLong();
+ return new ApplicationID(lower, upper);
+ }
+
+ /**
+ * Parses an ApplicationID from the given string.
+ *
+ * @param hexString string representation of an ApplicationID
+ * @return Parsed ApplicationID
+ * @throws IllegalArgumentException if the ApplicationID could not be
parsed from the given
+ * string
+ */
+ public static ApplicationID fromHexString(String hexString) {
+ try {
+ return new ApplicationID(StringUtils.hexStringToByte(hexString));
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ "Cannot parse ApplicationID from \""
+ + hexString
+ + "\". The expected format is "
+ + "[0-9a-fA-F]{32}, e.g.
fd72014d4c864993a2e5a9287b4a9c5d.",
+ e);
+ }
+ }
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java
b/flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java
new file mode 100644
index 00000000000..b5b7f5db04d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.DeploymentOptions;
+
+/** Possible states of an application. */
+@PublicEvolving
+public enum ApplicationState {
+
+ /** The application is newly created and has not started running. */
+ CREATED(false),
+
+ /** The application has started running. */
+ RUNNING(false),
+
+ /** The application has encountered a failure and is waiting for the
cleanup to complete. */
+ FAILING(false),
+
+ /** The application has failed due to an exception. */
+ FAILED(true),
+
+ /** The application is being canceled. */
+ CANCELING(false),
+
+ /** The application has been canceled. */
+ CANCELED(true),
+
+ /**
+ * All jobs in the application have completed, See {@link
+ * DeploymentOptions#TERMINATE_APPLICATION_ON_ANY_JOB_EXCEPTION} for more
information.
+ */
+ FINISHED(true);
+
+ //
--------------------------------------------------------------------------------------------
+
+ private final boolean terminalState;
+
+ ApplicationState(boolean terminalState) {
+ this.terminalState = terminalState;
+ }
+
+ public boolean isTerminalState() {
+ return terminalState;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java
new file mode 100644
index 00000000000..94a0621fa40
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.application;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.ApplicationState;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/** Base class for all applications. */
+public abstract class AbstractApplication implements Serializable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractApplication.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private final ApplicationID applicationId;
+
+ private ApplicationState applicationState;
+
+ /**
+ * Timestamps (in milliseconds as returned by {@code
System.currentTimeMillis()}) when the
+ * application transitioned into a certain status. The index into this
array is the ordinal of
+ * the enum value, i.e. the timestamp when the application went into state
"RUNNING" is at
+ * {@code timestamps[RUNNING.ordinal()]}.
+ */
+ private final long[] statusTimestamps;
+
+ private final Set<JobID> jobs = new HashSet<>();
+
+ public AbstractApplication(ApplicationID applicationId) {
+ this.applicationId = applicationId;
+ this.statusTimestamps = new long[ApplicationState.values().length];
+ this.applicationState = ApplicationState.CREATED;
+ this.statusTimestamps[ApplicationState.CREATED.ordinal()] =
System.currentTimeMillis();
+ }
+
+ /**
+ * Entry method to run the application asynchronously.
+ *
+ * <p>The returned CompletableFuture indicates that the execution request
has been accepted and
+ * the application transitions to RUNNING state.
+ *
+ * <p><b>Note:</b> This method must be called in the main thread of the
{@link Dispatcher}.
+ *
+ * @param dispatcherGateway the dispatcher of the cluster to run the
application.
+ * @param scheduledExecutor the executor to run the user logic.
+ * @param mainThreadExecutor the executor bound to the main thread.
+ * @param errorHandler the handler for fatal errors.
+ * @return a future indicating that the execution request has been
accepted.
+ */
+ public abstract CompletableFuture<Acknowledge> execute(
+ final DispatcherGateway dispatcherGateway,
+ final ScheduledExecutor scheduledExecutor,
+ final Executor mainThreadExecutor,
+ final FatalErrorHandler errorHandler);
+
+ /**
+ * Cancels the application execution.
+ *
+ * <p>This method is responsible for initiating the cancellation process
and handling the
+ * appropriate state transitions of the application.
+ *
+ * <p><b>Note:</b> This method must be called in the main thread of the
{@link Dispatcher}.
+ */
+ public abstract void cancel();
+
+ /**
+ * Cleans up execution associated with the application.
+ *
+ * <p>This method is typically invoked when the cluster is shutting down.
+ */
+ public abstract void dispose();
+
+ public abstract String getName();
+
+ public ApplicationID getApplicationId() {
+ return applicationId;
+ }
+
+ public Set<JobID> getJobs() {
+ return Collections.unmodifiableSet(jobs);
+ }
+
+ /**
+ * Adds a job ID to the jobs set.
+ *
+ * <p><b>Note:</b>This method must be called in the main thread of the
{@link Dispatcher}.
+ */
+ public boolean addJob(JobID jobId) {
+ return jobs.add(jobId);
+ }
+
+ public ApplicationState getApplicationStatus() {
+ return applicationState;
+ }
+
+ // ------------------------------------------------------------------------
+ // State Transitions
+ // ------------------------------------------------------------------------
+
+ private static final Map<ApplicationState, Set<ApplicationState>>
ALLOWED_TRANSITIONS;
+
+ static {
+ ALLOWED_TRANSITIONS = new EnumMap<>(ApplicationState.class);
+ ALLOWED_TRANSITIONS.put(
+ ApplicationState.CREATED,
+ new HashSet<>(Arrays.asList(ApplicationState.RUNNING,
ApplicationState.CANCELING)));
+ ALLOWED_TRANSITIONS.put(
+ ApplicationState.RUNNING,
+ new HashSet<>(
+ Arrays.asList(
+ ApplicationState.FINISHED,
+ ApplicationState.FAILING,
+ ApplicationState.CANCELING)));
+ ALLOWED_TRANSITIONS.put(
+ ApplicationState.FAILING,
+ new
HashSet<>(Collections.singletonList(ApplicationState.FAILED)));
+ ALLOWED_TRANSITIONS.put(
+ ApplicationState.CANCELING,
+ new
HashSet<>(Collections.singletonList(ApplicationState.CANCELED)));
+ }
+
+ /** All state transition methods must be called in the main thread. */
+ public void transitionToRunning() {
+ transitionState(ApplicationState.RUNNING);
+ }
+
+ /** All state transition methods must be called in the main thread. */
+ public void transitionToCanceling() {
+ transitionState(ApplicationState.CANCELING);
+ }
+
+ /** All state transition methods must be called in the main thread. */
+ public void transitionToFailing() {
+ transitionState(ApplicationState.FAILING);
+ }
+
+ /** All state transition methods must be called in the main thread. */
+ public void transitionToFailed() {
+ transitionState(ApplicationState.FAILED);
+ }
+
+ /** All state transition methods must be called in the main thread. */
+ public void transitionToFinished() {
+ transitionState(ApplicationState.FINISHED);
+ }
+
+ /** All state transition methods must be called in the main thread. */
+ public void transitionToCanceled() {
+ transitionState(ApplicationState.CANCELED);
+ }
+
+ void transitionState(ApplicationState targetState) {
+ validateTransition(targetState);
+ LOG.info(
+ "Application {} ({}) switched from state {} to {}.",
+ getName(),
+ getApplicationId(),
+ applicationState,
+ targetState);
+ this.statusTimestamps[targetState.ordinal()] =
System.currentTimeMillis();
+ this.applicationState = targetState;
+ }
+
+ private void validateTransition(ApplicationState targetState) {
+ Set<ApplicationState> allowedTransitions =
ALLOWED_TRANSITIONS.get(applicationState);
+ if (allowedTransitions == null ||
!allowedTransitions.contains(targetState)) {
+ throw new IllegalStateException(
+ String.format(
+ "Invalid transition from %s to %s",
applicationState, targetState));
+ }
+ }
+
+ public long getStatusTimestamp(ApplicationState status) {
+ return this.statusTimestamps[status.ordinal()];
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java
new file mode 100644
index 00000000000..ef609e41c3e
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.application;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.ApplicationState;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link AbstractApplication}. */
+public class AbstractApplicationTest {
+
+ @Test
+ void testInitialization() {
+ ApplicationID applicationID = new ApplicationID();
+ long start = System.currentTimeMillis();
+ AbstractApplication application = new MockApplication(applicationID);
+ long end = System.currentTimeMillis();
+
+ assertEquals(applicationID, application.getApplicationId());
+
+ assertEquals(ApplicationState.CREATED,
application.getApplicationStatus());
+
+ long ts = application.getStatusTimestamp(ApplicationState.CREATED);
+ assertTrue(start <= ts && ts <= end);
+ }
+
+ @Test
+ void testAddJob() {
+ AbstractApplication application = new MockApplication(new
ApplicationID());
+ JobID jobId = JobID.generate();
+
+ boolean added = application.addJob(jobId);
+ assertTrue(added);
+ assertEquals(1, application.getJobs().size());
+ assertTrue(application.getJobs().contains(jobId));
+
+ added = application.addJob(jobId);
+ assertFalse(added);
+ assertEquals(1, application.getJobs().size());
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = ApplicationState.class,
+ names = {"RUNNING", "CANCELING"})
+ void testTransitionFromCreated(ApplicationState targetState) {
+ AbstractApplication application = new MockApplication(new
ApplicationID());
+
+ long start = System.currentTimeMillis();
+ application.transitionState(targetState);
+ long end = System.currentTimeMillis();
+
+ assertEquals(targetState, application.getApplicationStatus());
+
+ long ts = application.getStatusTimestamp(targetState);
+ assertTrue(start <= ts && ts <= end);
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = ApplicationState.class,
+ names = {"CREATED", "FINISHED", "FAILING", "CANCELED", "FAILED"})
+ void testTransitionFromCreatedToUnsupportedStates(ApplicationState
targetState) {
+ AbstractApplication application = new MockApplication(new
ApplicationID());
+
+ assertThrows(IllegalStateException.class, () ->
application.transitionState(targetState));
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = ApplicationState.class,
+ names = {"FINISHED", "FAILING", "CANCELING"})
+ void testTransitionFromRunning(ApplicationState targetState) {
+ AbstractApplication application = new MockApplication(new
ApplicationID());
+ application.transitionToRunning();
+
+ long start = System.currentTimeMillis();
+ application.transitionState(targetState);
+ long end = System.currentTimeMillis();
+
+ assertEquals(targetState, application.getApplicationStatus());
+
+ long ts = application.getStatusTimestamp(targetState);
+ assertTrue(start <= ts && ts <= end);
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = ApplicationState.class,
+ names = {"CREATED", "RUNNING", "FAILED", "CANCELED"})
+ void testTransitionFromRunningToUnsupportedStates(ApplicationState
targetState) {
+ AbstractApplication application = new MockApplication(new
ApplicationID());
+ application.transitionToRunning();
+
+ assertThrows(IllegalStateException.class, () ->
application.transitionState(targetState));
+ }
+
+ @Test
+ void testTransitionFromCanceling() {
+ AbstractApplication application = new MockApplication(new
ApplicationID());
+ application.transitionToCanceling();
+
+ long start = System.currentTimeMillis();
+ application.transitionToCanceled();
+ long end = System.currentTimeMillis();
+
+ assertEquals(ApplicationState.CANCELED,
application.getApplicationStatus());
+
+ long ts = application.getStatusTimestamp(ApplicationState.CANCELED);
+ assertTrue(start <= ts && ts <= end);
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = ApplicationState.class,
+ names = {"CREATED", "RUNNING", "CANCELING", "FAILING", "FINISHED",
"FAILED"})
+ void testTransitionFromCancelingToUnsupportedStates(ApplicationState
targetState) {
+ AbstractApplication application = new MockApplication(new
ApplicationID());
+ application.transitionToCanceling();
+
+ assertThrows(IllegalStateException.class, () ->
application.transitionState(targetState));
+ }
+
+ @Test
+ void testTransitionFromFailing() {
+ AbstractApplication application = new MockApplication(new
ApplicationID());
+ application.transitionToRunning();
+ application.transitionToFailing();
+
+ long start = System.currentTimeMillis();
+ application.transitionToFailed();
+ long end = System.currentTimeMillis();
+
+ assertEquals(ApplicationState.FAILED,
application.getApplicationStatus());
+
+ long ts = application.getStatusTimestamp(ApplicationState.FAILED);
+ assertTrue(start <= ts && ts <= end);
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = ApplicationState.class,
+ names = {"CREATED", "RUNNING", "CANCELING", "FAILING", "FINISHED",
"CANCELED"})
+ void testTransitionFromFailingToUnsupportedStates(ApplicationState
targetState) {
+ AbstractApplication application = new MockApplication(new
ApplicationID());
+ application.transitionToRunning();
+ application.transitionToFailing();
+
+ assertThrows(IllegalStateException.class, () ->
application.transitionState(targetState));
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = ApplicationState.class)
+ void testTransitionFromFinished(ApplicationState targetState) {
+ AbstractApplication application = new MockApplication(new
ApplicationID());
+ application.transitionToRunning();
+ application.transitionToFinished();
+
+ assertThrows(IllegalStateException.class, () ->
application.transitionState(targetState));
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = ApplicationState.class)
+ void testTransitionFromCanceled(ApplicationState targetState) {
+ AbstractApplication application = new MockApplication(new
ApplicationID());
+ application.transitionToCanceling();
+ application.transitionToCanceled();
+
+ assertThrows(IllegalStateException.class, () ->
application.transitionState(targetState));
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = ApplicationState.class)
+ void testTransitionFromFailed(ApplicationState targetState) {
+ AbstractApplication application = new MockApplication(new
ApplicationID());
+ application.transitionToRunning();
+ application.transitionToFailing();
+ application.transitionToFailed();
+
+ assertThrows(IllegalStateException.class, () ->
application.transitionState(targetState));
+ }
+
+ private static class MockApplication extends AbstractApplication {
+ public MockApplication(ApplicationID applicationId) {
+ super(applicationId);
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> execute(
+ DispatcherGateway dispatcherGateway,
+ ScheduledExecutor scheduledExecutor,
+ Executor mainThreadExecutor,
+ FatalErrorHandler errorHandler) {
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ }
+
+ @Override
+ public void cancel() {}
+
+ @Override
+ public void dispose() {}
+
+ @Override
+ public String getName() {
+ return "Mock Application";
+ }
+ }
+}