This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0bc2041b23233eff9622753a0d4cfa3b03816421
Author: Weijie Guo <res...@163.com>
AuthorDate: Tue Apr 9 11:47:35 2024 +0800

    [FLINK-34549][API] Implement TaskInfo and expose it via RuntimeContext
---
 .../datastream/api/context/RuntimeContext.java     |  3 ++
 .../impl/context/DefaultNonPartitionedContext.java |  6 ++++
 .../impl/context/DefaultPartitionedContext.java    |  6 ++++
 .../impl/context/DefaultRuntimeContext.java        | 12 +++++++-
 ...artitionedContext.java => DefaultTaskInfo.java} | 33 ++++++++++++++--------
 .../DefaultTwoOutputNonPartitionedContext.java     |  6 ++++
 .../datastream/impl/operators/ProcessOperator.java |  8 +++++-
 .../TwoInputBroadcastProcessOperator.java          |  8 +++++-
 .../TwoInputNonBroadcastProcessOperator.java       |  8 +++++-
 .../impl/operators/TwoOutputProcessOperator.java   |  8 +++++-
 .../impl/context/DefaultTaskInfoTest.java}         | 23 +++++++--------
 11 files changed, 91 insertions(+), 30 deletions(-)

diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java
index 40476469b6b..c82929113f0 100644
--- 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java
@@ -30,4 +30,7 @@ import org.apache.flink.annotation.Experimental;
 public interface RuntimeContext {
     /** Get the {@link JobInfo} of this process function. */
     JobInfo getJobInfo();
+
+    /** Get the {@link TaskInfo} of this process function. */
+    TaskInfo getTaskInfo();
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
index a099272bd6d..d2fc17a9463 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
@@ -20,6 +20,7 @@ package org.apache.flink.datastream.impl.context;
 
 import org.apache.flink.datastream.api.context.JobInfo;
 import org.apache.flink.datastream.api.context.NonPartitionedContext;
+import org.apache.flink.datastream.api.context.TaskInfo;
 import org.apache.flink.datastream.api.function.ApplyPartitionFunction;
 
 /** The default implementation of {@link NonPartitionedContext}. */
@@ -39,4 +40,9 @@ public class DefaultNonPartitionedContext<OUT> implements 
NonPartitionedContext<
     public JobInfo getJobInfo() {
         return context.getJobInfo();
     }
+
+    @Override
+    public TaskInfo getTaskInfo() {
+        return context.getTaskInfo();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
index cdf3dc952df..e6c4b2d02a9 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
@@ -21,6 +21,7 @@ package org.apache.flink.datastream.impl.context;
 import org.apache.flink.datastream.api.context.JobInfo;
 import org.apache.flink.datastream.api.context.PartitionedContext;
 import org.apache.flink.datastream.api.context.RuntimeContext;
+import org.apache.flink.datastream.api.context.TaskInfo;
 
 /** The default implementation of {@link PartitionedContext}. */
 public class DefaultPartitionedContext implements PartitionedContext {
@@ -34,4 +35,9 @@ public class DefaultPartitionedContext implements 
PartitionedContext {
     public JobInfo getJobInfo() {
         return context.getJobInfo();
     }
+
+    @Override
+    public TaskInfo getTaskInfo() {
+        return context.getTaskInfo();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java
index 7b2c97aa93a..2e1820022d7 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java
@@ -20,18 +20,28 @@ package org.apache.flink.datastream.impl.context;
 
 import org.apache.flink.datastream.api.context.JobInfo;
 import org.apache.flink.datastream.api.context.RuntimeContext;
+import org.apache.flink.datastream.api.context.TaskInfo;
 import org.apache.flink.runtime.jobgraph.JobType;
 
 /** The default implementation of {@link RuntimeContext}. */
 public class DefaultRuntimeContext implements RuntimeContext {
     private final DefaultJobInfo jobInfo;
 
-    public DefaultRuntimeContext(String jobName, JobType jobType) {
+    private final DefaultTaskInfo taskInfo;
+
+    public DefaultRuntimeContext(
+            String jobName, JobType jobType, int parallelism, int 
maxParallelism, String taskName) {
         this.jobInfo = new DefaultJobInfo(jobName, jobType);
+        this.taskInfo = new DefaultTaskInfo(parallelism, maxParallelism, 
taskName);
     }
 
     @Override
     public JobInfo getJobInfo() {
         return jobInfo;
     }
+
+    @Override
+    public TaskInfo getTaskInfo() {
+        return taskInfo;
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTaskInfo.java
similarity index 56%
copy from 
flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
copy to 
flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTaskInfo.java
index a099272bd6d..c2589e93148 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTaskInfo.java
@@ -18,25 +18,34 @@
 
 package org.apache.flink.datastream.impl.context;
 
-import org.apache.flink.datastream.api.context.JobInfo;
-import org.apache.flink.datastream.api.context.NonPartitionedContext;
-import org.apache.flink.datastream.api.function.ApplyPartitionFunction;
+import org.apache.flink.datastream.api.context.TaskInfo;
 
-/** The default implementation of {@link NonPartitionedContext}. */
-public class DefaultNonPartitionedContext<OUT> implements 
NonPartitionedContext<OUT> {
-    private final DefaultRuntimeContext context;
+/** Default implementation of {@link TaskInfo}. */
+public class DefaultTaskInfo implements TaskInfo {
+    private final int parallelism;
 
-    public DefaultNonPartitionedContext(DefaultRuntimeContext context) {
-        this.context = context;
+    private final int maxParallelism;
+
+    private final String taskName;
+
+    public DefaultTaskInfo(int parallelism, int maxParallelism, String 
taskName) {
+        this.parallelism = parallelism;
+        this.maxParallelism = maxParallelism;
+        this.taskName = taskName;
+    }
+
+    @Override
+    public int getParallelism() {
+        return parallelism;
     }
 
     @Override
-    public void applyToAllPartitions(ApplyPartitionFunction<OUT> 
applyPartitionFunction) {
-        // TODO implements this method.
+    public int getMaxParallelism() {
+        return maxParallelism;
     }
 
     @Override
-    public JobInfo getJobInfo() {
-        return context.getJobInfo();
+    public String getTaskName() {
+        return taskName;
     }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java
index a6043f31a06..3ae9d4cab2d 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java
@@ -19,6 +19,7 @@
 package org.apache.flink.datastream.impl.context;
 
 import org.apache.flink.datastream.api.context.JobInfo;
+import org.apache.flink.datastream.api.context.TaskInfo;
 import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext;
 import 
org.apache.flink.datastream.api.function.TwoOutputApplyPartitionFunction;
 
@@ -41,4 +42,9 @@ public class DefaultTwoOutputNonPartitionedContext<OUT1, OUT2>
     public JobInfo getJobInfo() {
         return context.getJobInfo();
     }
+
+    @Override
+    public TaskInfo getTaskInfo() {
+        return context.getTaskInfo();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
index 877398683bc..a53bf41e3a6 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.datastream.impl.operators;
 
+import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
 import org.apache.flink.datastream.impl.common.OutputCollector;
 import org.apache.flink.datastream.impl.common.TimestampCollector;
@@ -54,9 +55,14 @@ public class ProcessOperator<IN, OUT>
     public void open() throws Exception {
         super.open();
         StreamingRuntimeContext operatorContext = getRuntimeContext();
+        TaskInfo taskInfo = operatorContext.getTaskInfo();
         context =
                 new DefaultRuntimeContext(
-                        operatorContext.getJobInfo().getJobName(), 
operatorContext.getJobType());
+                        operatorContext.getJobInfo().getJobName(),
+                        operatorContext.getJobType(),
+                        taskInfo.getNumberOfParallelSubtasks(),
+                        taskInfo.getMaxNumberOfParallelSubtasks(),
+                        taskInfo.getTaskName());
         partitionedContext = new DefaultPartitionedContext(context);
         nonPartitionedContext = new DefaultNonPartitionedContext<>(context);
         outputCollector = getOutputCollector();
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
index c926d404d46..1486d84e65a 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.datastream.impl.operators;
 
+import org.apache.flink.api.common.TaskInfo;
 import 
org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
 import org.apache.flink.datastream.impl.common.OutputCollector;
 import org.apache.flink.datastream.impl.common.TimestampCollector;
@@ -58,9 +59,14 @@ public class TwoInputBroadcastProcessOperator<IN1, IN2, OUT>
         super.open();
         this.collector = getOutputCollector();
         StreamingRuntimeContext operatorContext = getRuntimeContext();
+        TaskInfo taskInfo = operatorContext.getTaskInfo();
         this.context =
                 new DefaultRuntimeContext(
-                        operatorContext.getJobInfo().getJobName(), 
operatorContext.getJobType());
+                        operatorContext.getJobInfo().getJobName(),
+                        operatorContext.getJobType(),
+                        taskInfo.getNumberOfParallelSubtasks(),
+                        taskInfo.getMaxNumberOfParallelSubtasks(),
+                        taskInfo.getTaskName());
         this.partitionedContext = new DefaultPartitionedContext(context);
         this.nonPartitionedContext = new 
DefaultNonPartitionedContext<>(context);
     }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
index 20846c0faea..93f667d875f 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.datastream.impl.operators;
 
+import org.apache.flink.api.common.TaskInfo;
 import 
org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
 import org.apache.flink.datastream.impl.common.OutputCollector;
 import org.apache.flink.datastream.impl.common.TimestampCollector;
@@ -58,9 +59,14 @@ public class TwoInputNonBroadcastProcessOperator<IN1, IN2, 
OUT>
         super.open();
         this.collector = getOutputCollector();
         StreamingRuntimeContext operatorContext = getRuntimeContext();
+        TaskInfo taskInfo = operatorContext.getTaskInfo();
         this.context =
                 new DefaultRuntimeContext(
-                        operatorContext.getJobInfo().getJobName(), 
operatorContext.getJobType());
+                        operatorContext.getJobInfo().getJobName(),
+                        operatorContext.getJobType(),
+                        taskInfo.getNumberOfParallelSubtasks(),
+                        taskInfo.getMaxNumberOfParallelSubtasks(),
+                        taskInfo.getTaskName());
         this.partitionedContext = new DefaultPartitionedContext(context);
         this.nonPartitionedContext = new 
DefaultNonPartitionedContext<>(context);
     }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
index f24211f0abf..fff9086d94b 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.datastream.impl.operators;
 
+import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext;
 import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
 import org.apache.flink.datastream.impl.common.OutputCollector;
@@ -69,9 +70,14 @@ public class TwoOutputProcessOperator<IN, OUT_MAIN, OUT_SIDE>
         this.mainCollector = getMainCollector();
         this.sideCollector = getSideCollector();
         StreamingRuntimeContext operatorContext = getRuntimeContext();
+        TaskInfo taskInfo = operatorContext.getTaskInfo();
         this.context =
                 new DefaultRuntimeContext(
-                        operatorContext.getJobInfo().getJobName(), 
operatorContext.getJobType());
+                        operatorContext.getJobInfo().getJobName(),
+                        operatorContext.getJobType(),
+                        taskInfo.getNumberOfParallelSubtasks(),
+                        taskInfo.getMaxNumberOfParallelSubtasks(),
+                        taskInfo.getTaskName());
         this.partitionedContext = new DefaultPartitionedContext(context);
         this.nonPartitionedContext = new 
DefaultTwoOutputNonPartitionedContext<>(context);
     }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultTaskInfoTest.java
similarity index 61%
copy from 
flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java
copy to 
flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultTaskInfoTest.java
index 7b2c97aa93a..2f90511675d 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultTaskInfoTest.java
@@ -18,20 +18,17 @@
 
 package org.apache.flink.datastream.impl.context;
 
-import org.apache.flink.datastream.api.context.JobInfo;
-import org.apache.flink.datastream.api.context.RuntimeContext;
-import org.apache.flink.runtime.jobgraph.JobType;
+import org.junit.jupiter.api.Test;
 
-/** The default implementation of {@link RuntimeContext}. */
-public class DefaultRuntimeContext implements RuntimeContext {
-    private final DefaultJobInfo jobInfo;
+import static org.assertj.core.api.Assertions.assertThat;
 
-    public DefaultRuntimeContext(String jobName, JobType jobType) {
-        this.jobInfo = new DefaultJobInfo(jobName, jobType);
-    }
-
-    @Override
-    public JobInfo getJobInfo() {
-        return jobInfo;
+/** Tests for {@link DefaultTaskInfo}. */
+class DefaultTaskInfoTest {
+    @Test
+    void testTaskInfo() {
+        DefaultTaskInfo taskInfo = new DefaultTaskInfo(1, 2, "taskName");
+        assertThat(taskInfo.getParallelism()).isEqualTo(1);
+        assertThat(taskInfo.getMaxParallelism()).isEqualTo(2);
+        assertThat(taskInfo.getTaskName()).isEqualTo("taskName");
     }
 }

Reply via email to