[ 
https://issues.apache.org/jira/browse/FLINK-14740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining updated FLINK-14740:
---------------------------
    Description: 
In current design:

The DataSet job uses VertexID as the OperatorID in the OperatorMetricGroup 
(ps:TaskMetricGroup.getOrAddOperator (string name)).

If two operators in the same vertex have the same name, they will overwrite 
each other in the TaskMetricGroup.

Proposal:

We could add the OperatorID to the operator of the dataset.
{code:java}
for (TaskInChain tic : this.chainedTasksInSequence) {
   TaskConfig t = new TaskConfig(tic.getContainingVertex().getConfiguration());
   Integer nodeId = tic.getPlanNode().getOptimizerNode().getId();
   OperatorID operatorID = this.nodeId2OperatorId.get(nodeId);
   if(operatorID == null) {
      operatorID = new OperatorID();
      this.nodeId2OperatorId.put(nodeId, operatorID);
   }
   t.addChainedTask(tic.getChainedTask(), tic.getTaskConfig(), 
tic.getTaskName(), operatorID.toString());
}
{code}
Then we could get id from TaskInfo.

  was:
Now OperatorMetricGroup which in batch job use VertexId as OperatorId. For 
chain operator, they'll use the same id, if two chain operators which have same 
name. I We could update in JobGraphGenerator.compileJobGraph
{code:java}
for (TaskInChain tic : this.chainedTasksInSequence) {
   TaskConfig t = new TaskConfig(tic.getContainingVertex().getConfiguration());
   Integer nodeId = tic.getPlanNode().getOptimizerNode().getId();
   OperatorID operatorID = this.nodeId2OperatorId.get(nodeId);
   if(operatorID == null) {
      operatorID = new OperatorID();
      this.nodeId2OperatorId.put(nodeId, operatorID);
   }
   t.addChainedTask(tic.getChainedTask(), tic.getTaskConfig(), 
tic.getTaskName(), operatorID.toString());
}
{code}
Then we could get id from TaskInfo.


> Create OperatorID for OperatorMetricGroup which in batch job 
> -------------------------------------------------------------
>
>                 Key: FLINK-14740
>                 URL: https://issues.apache.org/jira/browse/FLINK-14740
>             Project: Flink
>          Issue Type: Wish
>          Components: Runtime / Metrics
>            Reporter: lining
>            Priority: Major
>
> In current design:
> The DataSet job uses VertexID as the OperatorID in the OperatorMetricGroup 
> (ps:TaskMetricGroup.getOrAddOperator (string name)).
> If two operators in the same vertex have the same name, they will overwrite 
> each other in the TaskMetricGroup.
> Proposal:
> We could add the OperatorID to the operator of the dataset.
> {code:java}
> for (TaskInChain tic : this.chainedTasksInSequence) {
>    TaskConfig t = new 
> TaskConfig(tic.getContainingVertex().getConfiguration());
>    Integer nodeId = tic.getPlanNode().getOptimizerNode().getId();
>    OperatorID operatorID = this.nodeId2OperatorId.get(nodeId);
>    if(operatorID == null) {
>       operatorID = new OperatorID();
>       this.nodeId2OperatorId.put(nodeId, operatorID);
>    }
>    t.addChainedTask(tic.getChainedTask(), tic.getTaskConfig(), 
> tic.getTaskName(), operatorID.toString());
> }
> {code}
> Then we could get id from TaskInfo.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to