Github user ifndef-SleePy commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5091#discussion_r154871284
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
 ---
    @@ -19,68 +19,104 @@
     package org.apache.flink.runtime.jobmanager.scheduler;
     
     import org.apache.flink.runtime.executiongraph.Execution;
    +import org.apache.flink.runtime.instance.SlotSharingGroupId;
     import org.apache.flink.runtime.jobgraph.JobVertexID;
     import org.apache.flink.util.Preconditions;
     
    +import javax.annotation.Nullable;
    +
     public class ScheduledUnit {
    -   
    +
    +   @Nullable
        private final Execution vertexExecution;
    -   
    -   private final SlotSharingGroup sharingGroup;
    -   
    -   private final CoLocationConstraint locationConstraint;
    +
    +   private final JobVertexID jobVertexId;
    +
    +   @Nullable
    +   private final SlotSharingGroupId slotSharingGroupId;
    +
    +   @Nullable
    +   private final CoLocationConstraint coLocationConstraint;
        
        // 
--------------------------------------------------------------------------------------------
        
        public ScheduledUnit(Execution task) {
    -           Preconditions.checkNotNull(task);
    -           
    -           this.vertexExecution = task;
    -           this.sharingGroup = null;
    -           this.locationConstraint = null;
    +           this(
    +                   Preconditions.checkNotNull(task),
    +                   task.getVertex().getJobvertexId(),
    +                   null,
    +                   null);
        }
        
    -   public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit) {
    -           Preconditions.checkNotNull(task);
    -           
    -           this.vertexExecution = task;
    -           this.sharingGroup = sharingUnit;
    -           this.locationConstraint = null;
    +   public ScheduledUnit(Execution task, SlotSharingGroupId 
slotSharingGroupId) {
    +           this(
    +                   Preconditions.checkNotNull(task),
    +                   task.getVertex().getJobvertexId(),
    +                   slotSharingGroupId,
    +                   null);
        }
        
    -   public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit, 
CoLocationConstraint locationConstraint) {
    -           Preconditions.checkNotNull(task);
    -           Preconditions.checkNotNull(sharingUnit);
    -           Preconditions.checkNotNull(locationConstraint);
    -           
    +   public ScheduledUnit(
    +                   Execution task,
    +                   SlotSharingGroupId slotSharingGroupId,
    +                   CoLocationConstraint coLocationConstraint) {
    +           this(
    +                   Preconditions.checkNotNull(task),
    +                   task.getVertex().getJobvertexId(),
    +                   slotSharingGroupId,
    +                   coLocationConstraint);
    +   }
    +
    +   public ScheduledUnit(
    +                   JobVertexID jobVertexId,
    +                   SlotSharingGroupId slotSharingGroupId,
    +                   CoLocationConstraint coLocationConstraint) {
    +           this(
    +                   null,
    +                   jobVertexId,
    +                   slotSharingGroupId,
    +                   coLocationConstraint);
    +   }
    +
    +   public ScheduledUnit(
    +           Execution task,
    +           JobVertexID jobVertexId,
    --- End diff --
    
    We can get JobVertexID from Execution. Do we need this in Constructor?


---

Reply via email to