This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0bc2041b23233eff9622753a0d4cfa3b03816421 Author: Weijie Guo <res...@163.com> AuthorDate: Tue Apr 9 11:47:35 2024 +0800 [FLINK-34549][API] Implement TaskInfo and expose it via RuntimeContext --- .../datastream/api/context/RuntimeContext.java | 3 ++ .../impl/context/DefaultNonPartitionedContext.java | 6 ++++ .../impl/context/DefaultPartitionedContext.java | 6 ++++ .../impl/context/DefaultRuntimeContext.java | 12 +++++++- ...artitionedContext.java => DefaultTaskInfo.java} | 33 ++++++++++++++-------- .../DefaultTwoOutputNonPartitionedContext.java | 6 ++++ .../datastream/impl/operators/ProcessOperator.java | 8 +++++- .../TwoInputBroadcastProcessOperator.java | 8 +++++- .../TwoInputNonBroadcastProcessOperator.java | 8 +++++- .../impl/operators/TwoOutputProcessOperator.java | 8 +++++- .../impl/context/DefaultTaskInfoTest.java} | 23 +++++++-------- 11 files changed, 91 insertions(+), 30 deletions(-) diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java index 40476469b6b..c82929113f0 100644 --- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java @@ -30,4 +30,7 @@ import org.apache.flink.annotation.Experimental; public interface RuntimeContext { /** Get the {@link JobInfo} of this process function. */ JobInfo getJobInfo(); + + /** Get the {@link TaskInfo} of this process function. */ + TaskInfo getTaskInfo(); } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java index a099272bd6d..d2fc17a9463 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java @@ -20,6 +20,7 @@ package org.apache.flink.datastream.impl.context; import org.apache.flink.datastream.api.context.JobInfo; import org.apache.flink.datastream.api.context.NonPartitionedContext; +import org.apache.flink.datastream.api.context.TaskInfo; import org.apache.flink.datastream.api.function.ApplyPartitionFunction; /** The default implementation of {@link NonPartitionedContext}. */ @@ -39,4 +40,9 @@ public class DefaultNonPartitionedContext<OUT> implements NonPartitionedContext< public JobInfo getJobInfo() { return context.getJobInfo(); } + + @Override + public TaskInfo getTaskInfo() { + return context.getTaskInfo(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java index cdf3dc952df..e6c4b2d02a9 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java @@ -21,6 +21,7 @@ package org.apache.flink.datastream.impl.context; import org.apache.flink.datastream.api.context.JobInfo; import org.apache.flink.datastream.api.context.PartitionedContext; import org.apache.flink.datastream.api.context.RuntimeContext; +import org.apache.flink.datastream.api.context.TaskInfo; /** The default implementation of {@link PartitionedContext}. */ public class DefaultPartitionedContext implements PartitionedContext { @@ -34,4 +35,9 @@ public class DefaultPartitionedContext implements PartitionedContext { public JobInfo getJobInfo() { return context.getJobInfo(); } + + @Override + public TaskInfo getTaskInfo() { + return context.getTaskInfo(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java index 7b2c97aa93a..2e1820022d7 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java @@ -20,18 +20,28 @@ package org.apache.flink.datastream.impl.context; import org.apache.flink.datastream.api.context.JobInfo; import org.apache.flink.datastream.api.context.RuntimeContext; +import org.apache.flink.datastream.api.context.TaskInfo; import org.apache.flink.runtime.jobgraph.JobType; /** The default implementation of {@link RuntimeContext}. */ public class DefaultRuntimeContext implements RuntimeContext { private final DefaultJobInfo jobInfo; - public DefaultRuntimeContext(String jobName, JobType jobType) { + private final DefaultTaskInfo taskInfo; + + public DefaultRuntimeContext( + String jobName, JobType jobType, int parallelism, int maxParallelism, String taskName) { this.jobInfo = new DefaultJobInfo(jobName, jobType); + this.taskInfo = new DefaultTaskInfo(parallelism, maxParallelism, taskName); } @Override public JobInfo getJobInfo() { return jobInfo; } + + @Override + public TaskInfo getTaskInfo() { + return taskInfo; + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTaskInfo.java similarity index 56% copy from flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java copy to flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTaskInfo.java index a099272bd6d..c2589e93148 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTaskInfo.java @@ -18,25 +18,34 @@ package org.apache.flink.datastream.impl.context; -import org.apache.flink.datastream.api.context.JobInfo; -import org.apache.flink.datastream.api.context.NonPartitionedContext; -import org.apache.flink.datastream.api.function.ApplyPartitionFunction; +import org.apache.flink.datastream.api.context.TaskInfo; -/** The default implementation of {@link NonPartitionedContext}. */ -public class DefaultNonPartitionedContext<OUT> implements NonPartitionedContext<OUT> { - private final DefaultRuntimeContext context; +/** Default implementation of {@link TaskInfo}. */ +public class DefaultTaskInfo implements TaskInfo { + private final int parallelism; - public DefaultNonPartitionedContext(DefaultRuntimeContext context) { - this.context = context; + private final int maxParallelism; + + private final String taskName; + + public DefaultTaskInfo(int parallelism, int maxParallelism, String taskName) { + this.parallelism = parallelism; + this.maxParallelism = maxParallelism; + this.taskName = taskName; + } + + @Override + public int getParallelism() { + return parallelism; } @Override - public void applyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction) { - // TODO implements this method. + public int getMaxParallelism() { + return maxParallelism; } @Override - public JobInfo getJobInfo() { - return context.getJobInfo(); + public String getTaskName() { + return taskName; } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java index a6043f31a06..3ae9d4cab2d 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java @@ -19,6 +19,7 @@ package org.apache.flink.datastream.impl.context; import org.apache.flink.datastream.api.context.JobInfo; +import org.apache.flink.datastream.api.context.TaskInfo; import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext; import org.apache.flink.datastream.api.function.TwoOutputApplyPartitionFunction; @@ -41,4 +42,9 @@ public class DefaultTwoOutputNonPartitionedContext<OUT1, OUT2> public JobInfo getJobInfo() { return context.getJobInfo(); } + + @Override + public TaskInfo getTaskInfo() { + return context.getTaskInfo(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java index 877398683bc..a53bf41e3a6 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java @@ -18,6 +18,7 @@ package org.apache.flink.datastream.impl.operators; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; import org.apache.flink.datastream.impl.common.OutputCollector; import org.apache.flink.datastream.impl.common.TimestampCollector; @@ -54,9 +55,14 @@ public class ProcessOperator<IN, OUT> public void open() throws Exception { super.open(); StreamingRuntimeContext operatorContext = getRuntimeContext(); + TaskInfo taskInfo = operatorContext.getTaskInfo(); context = new DefaultRuntimeContext( - operatorContext.getJobInfo().getJobName(), operatorContext.getJobType()); + operatorContext.getJobInfo().getJobName(), + operatorContext.getJobType(), + taskInfo.getNumberOfParallelSubtasks(), + taskInfo.getMaxNumberOfParallelSubtasks(), + taskInfo.getTaskName()); partitionedContext = new DefaultPartitionedContext(context); nonPartitionedContext = new DefaultNonPartitionedContext<>(context); outputCollector = getOutputCollector(); diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java index c926d404d46..1486d84e65a 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java @@ -18,6 +18,7 @@ package org.apache.flink.datastream.impl.operators; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction; import org.apache.flink.datastream.impl.common.OutputCollector; import org.apache.flink.datastream.impl.common.TimestampCollector; @@ -58,9 +59,14 @@ public class TwoInputBroadcastProcessOperator<IN1, IN2, OUT> super.open(); this.collector = getOutputCollector(); StreamingRuntimeContext operatorContext = getRuntimeContext(); + TaskInfo taskInfo = operatorContext.getTaskInfo(); this.context = new DefaultRuntimeContext( - operatorContext.getJobInfo().getJobName(), operatorContext.getJobType()); + operatorContext.getJobInfo().getJobName(), + operatorContext.getJobType(), + taskInfo.getNumberOfParallelSubtasks(), + taskInfo.getMaxNumberOfParallelSubtasks(), + taskInfo.getTaskName()); this.partitionedContext = new DefaultPartitionedContext(context); this.nonPartitionedContext = new DefaultNonPartitionedContext<>(context); } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java index 20846c0faea..93f667d875f 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java @@ -18,6 +18,7 @@ package org.apache.flink.datastream.impl.operators; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; import org.apache.flink.datastream.impl.common.OutputCollector; import org.apache.flink.datastream.impl.common.TimestampCollector; @@ -58,9 +59,14 @@ public class TwoInputNonBroadcastProcessOperator<IN1, IN2, OUT> super.open(); this.collector = getOutputCollector(); StreamingRuntimeContext operatorContext = getRuntimeContext(); + TaskInfo taskInfo = operatorContext.getTaskInfo(); this.context = new DefaultRuntimeContext( - operatorContext.getJobInfo().getJobName(), operatorContext.getJobType()); + operatorContext.getJobInfo().getJobName(), + operatorContext.getJobType(), + taskInfo.getNumberOfParallelSubtasks(), + taskInfo.getMaxNumberOfParallelSubtasks(), + taskInfo.getTaskName()); this.partitionedContext = new DefaultPartitionedContext(context); this.nonPartitionedContext = new DefaultNonPartitionedContext<>(context); } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java index f24211f0abf..fff9086d94b 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java @@ -18,6 +18,7 @@ package org.apache.flink.datastream.impl.operators; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext; import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; import org.apache.flink.datastream.impl.common.OutputCollector; @@ -69,9 +70,14 @@ public class TwoOutputProcessOperator<IN, OUT_MAIN, OUT_SIDE> this.mainCollector = getMainCollector(); this.sideCollector = getSideCollector(); StreamingRuntimeContext operatorContext = getRuntimeContext(); + TaskInfo taskInfo = operatorContext.getTaskInfo(); this.context = new DefaultRuntimeContext( - operatorContext.getJobInfo().getJobName(), operatorContext.getJobType()); + operatorContext.getJobInfo().getJobName(), + operatorContext.getJobType(), + taskInfo.getNumberOfParallelSubtasks(), + taskInfo.getMaxNumberOfParallelSubtasks(), + taskInfo.getTaskName()); this.partitionedContext = new DefaultPartitionedContext(context); this.nonPartitionedContext = new DefaultTwoOutputNonPartitionedContext<>(context); } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultTaskInfoTest.java similarity index 61% copy from flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java copy to flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultTaskInfoTest.java index 7b2c97aa93a..2f90511675d 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultTaskInfoTest.java @@ -18,20 +18,17 @@ package org.apache.flink.datastream.impl.context; -import org.apache.flink.datastream.api.context.JobInfo; -import org.apache.flink.datastream.api.context.RuntimeContext; -import org.apache.flink.runtime.jobgraph.JobType; +import org.junit.jupiter.api.Test; -/** The default implementation of {@link RuntimeContext}. */ -public class DefaultRuntimeContext implements RuntimeContext { - private final DefaultJobInfo jobInfo; +import static org.assertj.core.api.Assertions.assertThat; - public DefaultRuntimeContext(String jobName, JobType jobType) { - this.jobInfo = new DefaultJobInfo(jobName, jobType); - } - - @Override - public JobInfo getJobInfo() { - return jobInfo; +/** Tests for {@link DefaultTaskInfo}. */ +class DefaultTaskInfoTest { + @Test + void testTaskInfo() { + DefaultTaskInfo taskInfo = new DefaultTaskInfo(1, 2, "taskName"); + assertThat(taskInfo.getParallelism()).isEqualTo(1); + assertThat(taskInfo.getMaxParallelism()).isEqualTo(2); + assertThat(taskInfo.getTaskName()).isEqualTo("taskName"); } }