[ https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16235477#comment-16235477 ]
ASF GitHub Bot commented on FLINK-7153: --------------------------------------- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4916 Thanks a lot for your review @StephanEwen and @sihuazhou. I've addressed all your comments @StephanEwen. Next thing I'll do is to rebase onto the latest master and if Travis gives green light and you have no further objections, then I would like to merge it. > Eager Scheduling can't allocate source for ExecutionGraph correctly > ------------------------------------------------------------------- > > Key: FLINK-7153 > URL: https://issues.apache.org/jira/browse/FLINK-7153 > Project: Flink > Issue Type: Bug > Components: JobManager > Affects Versions: 1.3.1 > Reporter: Sihua Zhou > Assignee: Till Rohrmann > Priority: Critical > Fix For: 1.4.0, 1.3.3 > > > The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex > one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is > two problem about it: > 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return > empty, cause `sourceSlot` always be null until `ExectionVertex` has been > deployed via 'Execution.deployToSlot()'. So allocate resource base on > prefered location can't work correctly, we need to set the slot info for > `Execution` as soon as Execution.allocateSlotForExecution() called > successfully? > 2. Current allocate strategy can't allocate the slot optimize. Here is the > test case: > {code} > JobVertex v1 = new JobVertex("v1", jid1); > JobVertex v2 = new JobVertex("v2", jid2); > SlotSharingGroup group = new SlotSharingGroup(); > v1.setSlotSharingGroup(group); > v2.setSlotSharingGroup(group); > v1.setParallelism(2); > v2.setParallelism(4); > v1.setInvokableClass(BatchTask.class); > v2.setInvokableClass(BatchTask.class); > v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, > ResultPartitionType.PIPELINED_BOUNDED); > {code} > Currently, after allocate for v1,v2, we got a local partition and three > remote partition. But actually, it should be 2 local partition and 2 remote > partition. > The causes of the above problems is becuase that the current allocate > strategy is allocate the resource for execution one by one(if the execution > can allocate from SlotGroup than get it, Otherwise ask for a new one for it). > If we change the allocate strategy to two step will solve this problem, below > is the Pseudo code: > {code} > for (ExecutionJobVertex ejv: getVerticesTopologically) { > //step 1: try to allocate from SlothGroup base on inputs one by one (which > only allocate resource base on location). > //step 2: allocate for the remain execution. > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)