Github user ifndef-SleePy commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r154871284 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java --- @@ -19,68 +19,104 @@ package org.apache.flink.runtime.jobmanager.scheduler; import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; + public class ScheduledUnit { - + + @Nullable private final Execution vertexExecution; - - private final SlotSharingGroup sharingGroup; - - private final CoLocationConstraint locationConstraint; + + private final JobVertexID jobVertexId; + + @Nullable + private final SlotSharingGroupId slotSharingGroupId; + + @Nullable + private final CoLocationConstraint coLocationConstraint; // -------------------------------------------------------------------------------------------- public ScheduledUnit(Execution task) { - Preconditions.checkNotNull(task); - - this.vertexExecution = task; - this.sharingGroup = null; - this.locationConstraint = null; + this( + Preconditions.checkNotNull(task), + task.getVertex().getJobvertexId(), + null, + null); } - public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit) { - Preconditions.checkNotNull(task); - - this.vertexExecution = task; - this.sharingGroup = sharingUnit; - this.locationConstraint = null; + public ScheduledUnit(Execution task, SlotSharingGroupId slotSharingGroupId) { + this( + Preconditions.checkNotNull(task), + task.getVertex().getJobvertexId(), + slotSharingGroupId, + null); } - public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit, CoLocationConstraint locationConstraint) { - Preconditions.checkNotNull(task); - Preconditions.checkNotNull(sharingUnit); - Preconditions.checkNotNull(locationConstraint); - + public ScheduledUnit( + Execution task, + SlotSharingGroupId slotSharingGroupId, + CoLocationConstraint coLocationConstraint) { + this( + Preconditions.checkNotNull(task), + task.getVertex().getJobvertexId(), + slotSharingGroupId, + coLocationConstraint); + } + + public ScheduledUnit( + JobVertexID jobVertexId, + SlotSharingGroupId slotSharingGroupId, + CoLocationConstraint coLocationConstraint) { + this( + null, + jobVertexId, + slotSharingGroupId, + coLocationConstraint); + } + + public ScheduledUnit( + Execution task, + JobVertexID jobVertexId, --- End diff -- We can get JobVertexID from Execution. Do we need this in Constructor?
---