This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch FLIP-410-update3
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 60488e69e7aa2f772d9090a92447a876b1bec474
Author: Weijie Guo <res...@163.com>
AuthorDate: Mon Apr 8 17:25:39 2024 +0800

    [FLINK-34549][API] Implement JobInfo and expose it via RuntimeContext
---
 .../datastream/api/context/RuntimeContext.java     |  3 +-
 ...PartitionedContext.java => DefaultJobInfo.java} | 25 ++++++--
 .../impl/context/DefaultNonPartitionedContext.java | 11 ++++
 .../impl/context/DefaultRuntimeContext.java        | 13 +++-
 .../DefaultTwoOutputNonPartitionedContext.java     | 12 ++++
 .../datastream/impl/operators/ProcessOperator.java |  4 +-
 .../TwoInputBroadcastProcessOperator.java          |  4 +-
 .../TwoInputNonBroadcastProcessOperator.java       |  4 +-
 .../impl/operators/TwoOutputProcessOperator.java   |  4 +-
 .../impl/context/DefaultJobInfoTest.java           | 73 ++++++++++++++++++++++
 .../state/api/runtime/SavepointEnvironment.java    |  9 +++
 .../flink/runtime/execution/Environment.java       |  3 +
 .../DefaultExecutionGraphBuilder.java              |  3 +
 .../runtime/executiongraph/JobInformation.java     | 27 ++++++++
 .../runtime/taskmanager/RuntimeEnvironment.java    | 11 ++++
 .../org/apache/flink/runtime/taskmanager/Task.java |  6 ++
 .../operators/testutils/DummyEnvironment.java      |  7 +++
 .../operators/testutils/MockEnvironment.java       | 14 ++++-
 .../testutils/MockEnvironmentBuilder.java          | 15 +++++
 .../api/operators/StreamingRuntimeContext.java     |  5 ++
 .../runtime/tasks/StreamMockEnvironment.java       |  6 ++
 21 files changed, 241 insertions(+), 18 deletions(-)

diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java
index 6a1af18b183..40476469b6b 100644
--- 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java
@@ -28,5 +28,6 @@ import org.apache.flink.annotation.Experimental;
  */
 @Experimental
 public interface RuntimeContext {
-    // TODO Introduce related methods in the subsequent RP.
+    /** Get the {@link JobInfo} of this process function. */
+    JobInfo getJobInfo();
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultJobInfo.java
similarity index 53%
copy from 
flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
copy to 
flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultJobInfo.java
index 017bdcb8e25..faac1a9c3f5 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultJobInfo.java
@@ -18,14 +18,27 @@
 
 package org.apache.flink.datastream.impl.context;
 
-import org.apache.flink.datastream.api.context.NonPartitionedContext;
-import org.apache.flink.datastream.api.function.ApplyPartitionFunction;
+import org.apache.flink.datastream.api.context.JobInfo;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
-/** The default implementation of {@link NonPartitionedContext}. */
-public class DefaultNonPartitionedContext<OUT> implements 
NonPartitionedContext<OUT> {
+/** Default implementation of {@link JobInfo}. */
+public class DefaultJobInfo implements JobInfo {
+    private final StreamingRuntimeContext operatorContext;
+
+    public DefaultJobInfo(StreamingRuntimeContext streamingRuntimeContext) {
+        this.operatorContext = streamingRuntimeContext;
+    }
+
+    @Override
+    public String getJobName() {
+        return operatorContext.getJobInfo().getJobName();
+    }
 
     @Override
-    public void applyToAllPartitions(ApplyPartitionFunction<OUT> 
applyPartitionFunction) {
-        // TODO implements this method.
+    public ExecutionMode getExecutionMode() {
+        return operatorContext.getJobType() == JobType.STREAMING
+                ? ExecutionMode.STREAMING
+                : ExecutionMode.BATCH;
     }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
index 017bdcb8e25..a099272bd6d 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
@@ -18,14 +18,25 @@
 
 package org.apache.flink.datastream.impl.context;
 
+import org.apache.flink.datastream.api.context.JobInfo;
 import org.apache.flink.datastream.api.context.NonPartitionedContext;
 import org.apache.flink.datastream.api.function.ApplyPartitionFunction;
 
 /** The default implementation of {@link NonPartitionedContext}. */
 public class DefaultNonPartitionedContext<OUT> implements 
NonPartitionedContext<OUT> {
+    private final DefaultRuntimeContext context;
+
+    public DefaultNonPartitionedContext(DefaultRuntimeContext context) {
+        this.context = context;
+    }
 
     @Override
     public void applyToAllPartitions(ApplyPartitionFunction<OUT> 
applyPartitionFunction) {
         // TODO implements this method.
     }
+
+    @Override
+    public JobInfo getJobInfo() {
+        return context.getJobInfo();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java
index 22f61223ca1..492c8fa5c2c 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java
@@ -18,9 +18,20 @@
 
 package org.apache.flink.datastream.impl.context;
 
+import org.apache.flink.datastream.api.context.JobInfo;
 import org.apache.flink.datastream.api.context.RuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
 /** The default implementation of {@link RuntimeContext}. */
 public class DefaultRuntimeContext implements RuntimeContext {
-    // TODO implements this class
+    private final DefaultJobInfo jobInfo;
+
+    public DefaultRuntimeContext(StreamingRuntimeContext operatorContext) {
+        this.jobInfo = new DefaultJobInfo(operatorContext);
+    }
+
+    @Override
+    public JobInfo getJobInfo() {
+        return jobInfo;
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java
index 7a76e923191..a6043f31a06 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java
@@ -18,15 +18,27 @@
 
 package org.apache.flink.datastream.impl.context;
 
+import org.apache.flink.datastream.api.context.JobInfo;
 import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext;
 import 
org.apache.flink.datastream.api.function.TwoOutputApplyPartitionFunction;
 
 /** The default implementation of {@link TwoOutputNonPartitionedContext}. */
 public class DefaultTwoOutputNonPartitionedContext<OUT1, OUT2>
         implements TwoOutputNonPartitionedContext<OUT1, OUT2> {
+    private final DefaultRuntimeContext context;
+
+    public DefaultTwoOutputNonPartitionedContext(DefaultRuntimeContext 
context) {
+        this.context = context;
+    }
+
     @Override
     public void applyToAllPartitions(
             TwoOutputApplyPartitionFunction<OUT1, OUT2> 
applyPartitionFunction) {
         // TODO implements this method.
     }
+
+    @Override
+    public JobInfo getJobInfo() {
+        return context.getJobInfo();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
index 13204b27b02..40cc595ce53 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
@@ -52,9 +52,9 @@ public class ProcessOperator<IN, OUT>
     @Override
     public void open() throws Exception {
         super.open();
-        context = new DefaultRuntimeContext();
+        context = new DefaultRuntimeContext(getRuntimeContext());
         partitionedContext = new DefaultPartitionedContext(context);
-        nonPartitionedContext = new DefaultNonPartitionedContext<>();
+        nonPartitionedContext = new DefaultNonPartitionedContext<>(context);
         outputCollector = getOutputCollector();
     }
 
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
index 7c768715375..f8fdca63bd5 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
@@ -56,9 +56,9 @@ public class TwoInputBroadcastProcessOperator<IN1, IN2, OUT>
     public void open() throws Exception {
         super.open();
         this.collector = getOutputCollector();
-        this.context = new DefaultRuntimeContext();
+        this.context = new DefaultRuntimeContext(getRuntimeContext());
         this.partitionedContext = new DefaultPartitionedContext(context);
-        this.nonPartitionedContext = new DefaultNonPartitionedContext<>();
+        this.nonPartitionedContext = new 
DefaultNonPartitionedContext<>(context);
     }
 
     @Override
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
index 36e403afc35..dff0284ae28 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
@@ -56,9 +56,9 @@ public class TwoInputNonBroadcastProcessOperator<IN1, IN2, 
OUT>
     public void open() throws Exception {
         super.open();
         this.collector = getOutputCollector();
-        this.context = new DefaultRuntimeContext();
+        this.context = new DefaultRuntimeContext(getRuntimeContext());
         this.partitionedContext = new DefaultPartitionedContext(context);
-        this.nonPartitionedContext = new DefaultNonPartitionedContext<>();
+        this.nonPartitionedContext = new 
DefaultNonPartitionedContext<>(context);
     }
 
     @Override
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
index 671f0ee15cb..42e39ee7965 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
@@ -67,9 +67,9 @@ public class TwoOutputProcessOperator<IN, OUT_MAIN, OUT_SIDE>
     public void open() throws Exception {
         this.mainCollector = getMainCollector();
         this.sideCollector = getSideCollector();
-        this.context = new DefaultRuntimeContext();
+        this.context = new DefaultRuntimeContext(getRuntimeContext());
         this.partitionedContext = new DefaultPartitionedContext(context);
-        this.nonPartitionedContext = new 
DefaultTwoOutputNonPartitionedContext<>();
+        this.nonPartitionedContext = new 
DefaultTwoOutputNonPartitionedContext<>(context);
     }
 
     @Override
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultJobInfoTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultJobInfoTest.java
new file mode 100644
index 00000000000..3e08793286f
--- /dev/null
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultJobInfoTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.context;
+
+import org.apache.flink.datastream.api.context.JobInfo;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DefaultJobInfo}. */
+class DefaultJobInfoTest {
+    @Test
+    void testGetJobName() {
+        String jobName = "Test-Job";
+        DefaultJobInfo jobInfo = createDefaultJobInfo(jobName);
+        assertThat(jobInfo.getJobName()).isEqualTo(jobName);
+    }
+
+    @Test
+    void testGetExecutionMode() {
+        DefaultJobInfo batchJob = createDefaultJobInfo(JobType.BATCH);
+        
assertThat(batchJob.getExecutionMode()).isEqualTo(JobInfo.ExecutionMode.BATCH);
+
+        DefaultJobInfo streamingJob = createDefaultJobInfo(JobType.STREAMING);
+        
assertThat(streamingJob.getExecutionMode()).isEqualTo(JobInfo.ExecutionMode.STREAMING);
+    }
+
+    private static DefaultJobInfo createDefaultJobInfo(String jobName) {
+        return createDefaultJobInfo(JobType.STREAMING, jobName);
+    }
+
+    private static DefaultJobInfo createDefaultJobInfo(JobType jobType) {
+        return createDefaultJobInfo(jobType, "mock-job");
+    }
+
+    private static DefaultJobInfo createDefaultJobInfo(JobType jobType, String 
jobName) {
+        return new DefaultJobInfo(
+                new MockStreamingRuntimeContext(
+                        false,
+                        2,
+                        1,
+                        new MockEnvironmentBuilder()
+                                .setTaskName("mockTask")
+                                .setManagedMemorySize(4 * 
MemoryManager.DEFAULT_PAGE_SIZE)
+                                .setParallelism(2)
+                                .setMaxParallelism(2)
+                                .setSubtaskIndex(1)
+                                .setJobType(jobType)
+                                .setJobName(jobName)
+                                .build()));
+    }
+}
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
index 350d0fc9875..00f471f1a7b 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
@@ -45,6 +45,7 @@ import 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -85,6 +86,8 @@ public class SavepointEnvironment implements Environment {
 
     private final JobID jobID;
 
+    private final JobType jobType;
+
     private final JobVertexID vertexID;
 
     private final ExecutionAttemptID attemptID;
@@ -124,6 +127,7 @@ public class SavepointEnvironment implements Environment {
             PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskState) {
         this.jobID = new JobID();
         this.vertexID = new JobVertexID();
+        this.jobType = JobType.STREAMING;
         this.attemptID =
                 new ExecutionAttemptID(
                         new ExecutionGraphID(), new 
ExecutionVertexID(vertexID, indexOfSubtask), 0);
@@ -156,6 +160,11 @@ public class SavepointEnvironment implements Environment {
         return jobID;
     }
 
+    @Override
+    public JobType getJobType() {
+        return jobType;
+    }
+
     @Override
     public JobVertexID getJobVertexId() {
         return vertexID;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 467f698045b..80199f490c4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
@@ -77,6 +78,8 @@ public interface Environment {
      */
     JobID getJobID();
 
+    JobType getJobType();
+
     /**
      * Gets the ID of the JobVertex for which this task executes a parallel 
subtask.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
index 7e41af896df..57be7f49abf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.runtime.executiongraph.failover.partitionrelease.Partiti
 import 
org.apache.flink.runtime.executiongraph.failover.partitionrelease.PartitionGroupReleaseStrategyFactoryLoader;
 import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
@@ -103,10 +104,12 @@ public class DefaultExecutionGraphBuilder {
 
         final String jobName = jobGraph.getName();
         final JobID jobId = jobGraph.getJobID();
+        final JobType jobType = jobGraph.getJobType();
 
         final JobInformation jobInformation =
                 new JobInformation(
                         jobId,
+                        jobType,
                         jobName,
                         jobGraph.getSerializedExecutionConfig(),
                         jobGraph.getJobConfiguration(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
index 5792caa2711..94a9fc9b34b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 
@@ -42,6 +43,9 @@ public class JobInformation implements Serializable {
     /** Id of the job. */
     private final JobID jobId;
 
+    /** Type of the job. */
+    private final JobType jobType;
+
     /** Job name. */
     private final String jobName;
 
@@ -64,7 +68,26 @@ public class JobInformation implements Serializable {
             Configuration jobConfiguration,
             Collection<PermanentBlobKey> requiredJarFileBlobKeys,
             Collection<URL> requiredClasspathURLs) {
+        this(
+                jobId,
+                JobType.STREAMING,
+                jobName,
+                serializedExecutionConfig,
+                jobConfiguration,
+                requiredJarFileBlobKeys,
+                requiredClasspathURLs);
+    }
+
+    public JobInformation(
+            JobID jobId,
+            JobType jobType,
+            String jobName,
+            SerializedValue<ExecutionConfig> serializedExecutionConfig,
+            Configuration jobConfiguration,
+            Collection<PermanentBlobKey> requiredJarFileBlobKeys,
+            Collection<URL> requiredClasspathURLs) {
         this.jobId = Preconditions.checkNotNull(jobId);
+        this.jobType = Preconditions.checkNotNull(jobType);
         this.jobName = Preconditions.checkNotNull(jobName);
         this.serializedExecutionConfig = 
Preconditions.checkNotNull(serializedExecutionConfig);
         this.jobConfiguration =
@@ -83,6 +106,10 @@ public class JobInformation implements Serializable {
         return jobName;
     }
 
+    public JobType getJobType() {
+        return jobType;
+    }
+
     public SerializedValue<ExecutionConfig> getSerializedExecutionConfig() {
         return serializedExecutionConfig;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 104b42c2b1a..d5e297c54b0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
@@ -64,6 +65,9 @@ public class RuntimeEnvironment implements Environment {
 
     private final JobID jobId;
     private final JobVertexID jobVertexId;
+
+    private final JobType jobType;
+
     private final ExecutionAttemptID executionId;
 
     private final JobInfo jobInfo;
@@ -118,6 +122,7 @@ public class RuntimeEnvironment implements Environment {
 
     public RuntimeEnvironment(
             JobID jobId,
+            JobType jobType,
             JobVertexID jobVertexId,
             ExecutionAttemptID executionId,
             ExecutionConfig executionConfig,
@@ -149,6 +154,7 @@ public class RuntimeEnvironment implements Environment {
             TaskManagerActions taskManagerActions) {
 
         this.jobId = checkNotNull(jobId);
+        this.jobType = checkNotNull(jobType);
         this.jobVertexId = checkNotNull(jobVertexId);
         this.executionId = checkNotNull(executionId);
         this.jobInfo = checkNotNull(jobInfo);
@@ -192,6 +198,11 @@ public class RuntimeEnvironment implements Environment {
         return jobId;
     }
 
+    @Override
+    public JobType getJobType() {
+        return jobType;
+    }
+
     @Override
     public JobVertexID getJobVertexId() {
         return jobVertexId;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index e813de5c398..afc74f9a98d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -62,6 +62,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask;
@@ -162,6 +163,9 @@ public class Task
     /** The job that the task belongs to. */
     private final JobID jobId;
 
+    /** The type of this job. */
+    private final JobType jobType;
+
     /** The vertex in the JobGraph whose code the task executes. */
     private final JobVertexID vertexId;
 
@@ -352,6 +356,7 @@ public class Task
                         String.valueOf(slotAllocationId));
 
         this.jobId = jobInformation.getJobId();
+        this.jobType = jobInformation.getJobType();
         this.vertexId = taskInformation.getJobVertexId();
         this.executionId = Preconditions.checkNotNull(executionAttemptID);
         this.allocationId = Preconditions.checkNotNull(slotAllocationId);
@@ -695,6 +700,7 @@ public class Task
             Environment env =
                     new RuntimeEnvironment(
                             jobId,
+                            jobType,
                             vertexId,
                             executionId,
                             executionConfig,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index edb85cf19c5..0a6a425c4ac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
@@ -72,6 +73,7 @@ public class DummyEnvironment implements Environment {
 
     private final JobInfo jobInfo = new JobInfoImpl(new JobID(), "DummyJob");
     private final JobVertexID jobVertexId = new JobVertexID();
+    private final JobType jobType = JobType.STREAMING;
     private final ExecutionAttemptID executionId;
     private final ExecutionConfig executionConfig = new ExecutionConfig();
     private final TaskInfo taskInfo;
@@ -128,6 +130,11 @@ public class DummyEnvironment implements Environment {
         return jobInfo.getJobId();
     }
 
+    @Override
+    public JobType getJobType() {
+        return jobType;
+    }
+
     @Override
     public JobVertexID getJobVertexId() {
         return jobVertexId;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 9d0249c1257..ec9bfcb917d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -42,6 +42,7 @@ import 
org.apache.flink.runtime.io.network.api.writer.RecordCollectingResultPart
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import 
org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
@@ -85,6 +86,8 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
 
     private final TaskInfo taskInfo;
 
+    private final JobType jobType;
+
     private final ExecutionConfig executionConfig;
 
     private final MemoryManager memManager;
@@ -149,7 +152,9 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
 
     protected MockEnvironment(
             JobID jobID,
+            String jobName,
             JobVertexID jobVertexID,
+            JobType jobType,
             String taskName,
             MockInputSplitProvider inputSplitProvider,
             int bufferSize,
@@ -168,9 +173,9 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
             ExternalResourceInfoProvider externalResourceInfoProvider,
             ChannelStateWriteRequestExecutorFactory 
channelStateExecutorFactory) {
 
-        this.jobInfo = new JobInfoImpl(jobID, "MockJob");
+        this.jobInfo = new JobInfoImpl(jobID, jobName);
         this.jobVertexID = jobVertexID;
-
+        this.jobType = jobType;
         this.taskInfo = new TaskInfoImpl(taskName, maxParallelism, 
subtaskIndex, parallelism, 0);
         this.jobConfiguration = new Configuration();
         this.taskConfiguration = taskConfiguration;
@@ -269,6 +274,11 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
         return this.jobInfo.getJobId();
     }
 
+    @Override
+    public JobType getJobType() {
+        return jobType;
+    }
+
     @Override
     public Configuration getJobConfiguration() {
         return this.jobConfiguration;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
index ec61b9a6fa5..2a8c7c273a8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecu
 import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.memory.MemoryManagerBuilder;
@@ -41,6 +42,7 @@ import org.apache.flink.util.UserCodeClassLoader;
 
 public class MockEnvironmentBuilder {
     private String taskName = "mock-task";
+    private String jobName = "mock-job";
     private MockInputSplitProvider inputSplitProvider = null;
     private int bufferSize = 16;
     private TaskStateManager taskStateManager = new TestTaskStateManager();
@@ -54,6 +56,7 @@ public class MockEnvironmentBuilder {
             TestingUserCodeClassLoader.newBuilder().build();
     private JobID jobID = new JobID();
     private JobVertexID jobVertexID = new JobVertexID();
+    private JobType jobType = JobType.STREAMING;
     private TaskMetricGroup taskMetricGroup =
             UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
     private TaskManagerRuntimeInfo taskManagerRuntimeInfo = new 
TestingTaskManagerRuntimeInfo();
@@ -136,6 +139,11 @@ public class MockEnvironmentBuilder {
         return this;
     }
 
+    public MockEnvironmentBuilder setJobName(String jobName) {
+        this.jobName = jobName;
+        return this;
+    }
+
     public MockEnvironmentBuilder setJobVertexID(JobVertexID jobVertexID) {
         this.jobVertexID = jobVertexID;
         return this;
@@ -168,13 +176,20 @@ public class MockEnvironmentBuilder {
         return this;
     }
 
+    public MockEnvironmentBuilder setJobType(JobType jobType) {
+        this.jobType = jobType;
+        return this;
+    }
+
     public MockEnvironment build() {
         if (ioManager == null) {
             ioManager = new IOManagerAsync();
         }
         return new MockEnvironment(
                 jobID,
+                jobName,
                 jobVertexID,
+                jobType,
                 taskName,
                 inputSplitProvider,
                 bufferSize,
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index 2e82d1c2b08..d7bb2bf57f6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -40,6 +40,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.state.v2.KeyedStateStoreV2;
@@ -169,6 +170,10 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
         return taskEnvironment.getJobConfiguration();
     }
 
+    public JobType getJobType() {
+        return taskEnvironment.getJobType();
+    }
+
     @Override
     public Set<ExternalResourceInfo> getExternalResourceInfos(String 
resourceName) {
         return 
externalResourceInfoProvider.getExternalResourceInfos(resourceName);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 7e9f04be81c..9fb6b733762 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -43,6 +43,7 @@ import 
org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import 
org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
@@ -285,6 +286,11 @@ public class StreamMockEnvironment implements Environment {
         return this.jobInfo.getJobId();
     }
 
+    @Override
+    public JobType getJobType() {
+        return JobType.STREAMING;
+    }
+
     @Override
     public Configuration getJobConfiguration() {
         return this.jobConfiguration;


Reply via email to