Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/313#discussion_r61673039
  
    --- Diff: 
engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -778,32 +799,162 @@ public void 
resetStreamPersistanceOnSinkRemoval(InputPortMeta sinkBeingRemoved)
         }
       }
     
    +  public class DagNodeMeta implements Serializable
    +  {
    +    protected final LinkedHashMap<InputPortMeta, StreamMeta> inputStreams 
= new LinkedHashMap<>();
    +    protected final LinkedHashMap<OutputPortMeta, StreamMeta> 
outputStreams = new LinkedHashMap<>();
    +    @NotNull
    +    protected final String name;
    +    protected transient Vertex node;
    +
    +    /*
    +     * Used for  OIO validation,
    +     *  value null => node not visited yet
    +     *  other value => represents the root oio node for this node
    +     */
    +    protected transient Integer oioRoot = null;
    +    public DagNodeMeta(String name, Vertex node)
    +    {
    +      this.name = name;
    +      this.node = node;
    +    }
    +
    +    public String getName()
    +    {
    +      return name;
    +    }
    +
    +    public Vertex getNode()
    +    {
    +      return node;
    +    }
    +
    +    private class PortMapping implements Operators.OperatorDescriptor
    +    {
    +      private final Map<Operator.InputPort<?>, InputPortMeta> inPortMap = 
new HashMap<>();
    +      private final Map<Operator.OutputPort<?>, OutputPortMeta> outPortMap 
= new HashMap<>();
    +      private final Map<String, Object> portNameMap = new HashMap<>();
    +
    +      @Override
    +      public void addInputPort(InputPort<?> portObject, Field field, 
InputPortFieldAnnotation portAnnotation, AppData.QueryPort adqAnnotation)
    +      {
    +        if (!DagNodeMeta.this.inputStreams.isEmpty()) {
    +          for (Map.Entry<LogicalPlan.InputPortMeta, 
LogicalPlan.StreamMeta> e : DagNodeMeta.this.inputStreams.entrySet()) {
    +            LogicalPlan.InputPortMeta pm = e.getKey();
    +            if (pm.operatorMeta == DagNodeMeta.this && 
pm.fieldName.equals(field.getName())) {
    +              //LOG.debug("Found existing port meta for: " + field);
    +              inPortMap.put(portObject, pm);
    +              markInputPortIfHidden(pm.getPortName(), pm, 
field.getDeclaringClass());
    +              return;
    +            }
    +          }
    +        }
    +        InputPortMeta metaPort = new InputPortMeta();
    +        metaPort.operatorMeta = DagNodeMeta.this;
    +        metaPort.fieldName = field.getName();
    +        metaPort.portAnnotation = portAnnotation;
    +        metaPort.adqAnnotation = adqAnnotation;
    +        inPortMap.put(portObject, metaPort);
    +        markInputPortIfHidden(metaPort.getPortName(), metaPort, 
field.getDeclaringClass());
    +      }
    +
    +      @Override
    +      public void addOutputPort(OutputPort<?> portObject, Field field, 
OutputPortFieldAnnotation portAnnotation, AppData.ResultPort adrAnnotation)
    +      {
    +        if (!DagNodeMeta.this.outputStreams.isEmpty()) {
    +          for (Map.Entry<LogicalPlan.OutputPortMeta, 
LogicalPlan.StreamMeta> e : DagNodeMeta.this.outputStreams.entrySet()) {
    +            LogicalPlan.OutputPortMeta pm = e.getKey();
    +            if (pm.operatorMeta == DagNodeMeta.this && 
pm.fieldName.equals(field.getName())) {
    +              //LOG.debug("Found existing port meta for: " + field);
    +              outPortMap.put(portObject, pm);
    +              markOutputPortIfHidden(pm.getPortName(), pm, 
field.getDeclaringClass());
    +              return;
    +            }
    +          }
    +        }
    +        OutputPortMeta metaPort = new OutputPortMeta();
    +        metaPort.operatorMeta = DagNodeMeta.this;
    +        metaPort.fieldName = field.getName();
    +        metaPort.portAnnotation = portAnnotation;
    +        metaPort.adrAnnotation = adrAnnotation;
    +        outPortMap.put(portObject, metaPort);
    +        markOutputPortIfHidden(metaPort.getPortName(), metaPort, 
field.getDeclaringClass());
    +      }
    +
    +      private void markOutputPortIfHidden(String portName, OutputPortMeta 
portMeta, Class<?> declaringClass)
    +      {
    +        if (!portNameMap.containsKey(portName)) {
    +          portNameMap.put(portName, portMeta);
    +        } else {
    +          // make the port optional
    +          portMeta.classDeclaringHiddenPort = declaringClass;
    +        }
    +
    +      }
    +
    +      private void markInputPortIfHidden(String portName, InputPortMeta 
portMeta, Class<?> declaringClass)
    +      {
    +        if (!portNameMap.containsKey(portName)) {
    +          portNameMap.put(portName, portMeta);
    +        } else {
    +          // make the port optional
    +          portMeta.classDeclaringHiddenPort = declaringClass;
    +        }
    +      }
    +    }
    +
    +    /**
    +     * Ports objects are transient, we keep a lazy initialized mapping
    +     */
    +    private transient PortMapping portMapping = null;
    +
    +    protected PortMapping getPortMapping()
    +    {
    +      if (this.portMapping == null) {
    +        this.portMapping = new PortMapping();
    +        Operators.describe(this.getNode(), portMapping);
    +      }
    +      return portMapping;
    +    }
    +
    +    public OutputPortMeta getMeta(Operator.OutputPort<?> port)
    +    {
    +      return getPortMapping().outPortMap.get(port);
    +    }
    +
    +    public InputPortMeta getMeta(Operator.InputPort<?> port)
    +    {
    +      return getPortMapping().inPortMap.get(port);
    +    }
    +
    +    public Map<InputPortMeta, StreamMeta> getInputStreams()
    +    {
    +      return this.inputStreams;
    +    }
    +
    +    public Map<OutputPortMeta, StreamMeta> getOutputStreams()
    +    {
    +      return this.outputStreams;
    +    }
    +
    +  }
    +
       /**
        * Operator meta object.
        */
    -  public final class OperatorMeta implements DAG.OperatorMeta, Serializable
    +  public class OperatorMeta extends DagNodeMeta implements 
DAG.OperatorMeta, Serializable
    --- End diff --
    
    Not clear why we need this distinction and why some of the fields are 
present here and not in "DagNodeMeta".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to