[ 
https://issues.apache.org/jira/browse/APEXCORE-60?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15083780#comment-15083780
 ] 

ASF GitHub Bot commented on APEXCORE-60:
----------------------------------------

Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48895870
  
    --- Diff: 
engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---
    @@ -212,30 +225,60 @@ public final void run()
         long spinMillis = context.getValue(OperatorContext.SPIN_MILLIS);
         final boolean handleIdleTime = operator instanceof IdleTimeHandler;
         int totalQueues = inputs.size();
    +    int regularQueues = totalQueues;
    +    // regularQueues is the number of queues that are not connected to a 
DelayOperator
    +    for (String portName : inputs.keySet()) {
    +      if (isInputPortConnectedToDelayOperator(portName)) {
    +        regularQueues--;
    +      }
    +    }
     
    -    ArrayList<SweepableReservoir> activeQueues = new 
ArrayList<SweepableReservoir>();
    -    activeQueues.addAll(inputs.values());
    +    ArrayList<Map.Entry<String, SweepableReservoir>> activeQueues = new 
ArrayList<>();
    +    activeQueues.addAll(inputs.entrySet());
     
         int expectingBeginWindow = activeQueues.size();
         int receivedEndWindow = 0;
    +    long firstWindowId = -1;
     
         TupleTracker tracker;
         LinkedList<TupleTracker> resetTupleTracker = new 
LinkedList<TupleTracker>();
    -
         try {
           do {
    -        Iterator<SweepableReservoir> buffers = activeQueues.iterator();
    +        Iterator<Map.Entry<String, SweepableReservoir>> buffers = 
activeQueues.iterator();
       activequeue:
             while (buffers.hasNext()) {
    -          SweepableReservoir activePort = buffers.next();
    +          Map.Entry<String, SweepableReservoir> activePortEntry = 
buffers.next();
    +          SweepableReservoir activePort = activePortEntry.getValue();
               Tuple t = activePort.sweep();
    +          boolean needResetWindow = false;
               if (t != null) {
    +            boolean delay = (operator instanceof Operator.DelayOperator);
    +            long windowAhead = 0;
    +            if (delay) {
    +              windowAhead = 
WindowGenerator.getAheadWindowId(t.getWindowId(), firstWindowMillis, 
windowWidthMillis, 1);
    +              if (WindowGenerator.getBaseSecondsFromWindowId(windowAhead) 
> t.getBaseSeconds()) {
    --- End diff --
    
    fixed


> Iterative processing support
> ----------------------------
>
>                 Key: APEXCORE-60
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-60
>             Project: Apache Apex Core
>          Issue Type: New Feature
>            Reporter: David Yan
>            Assignee: David Yan
>              Labels: roadmap
>             Fix For: 3.3.0
>
>
> We would like to support iterative processing by introducing cycles in the 
> graph (known as DAG now, but no longer if we support iterative processing).
> Initial idea is as follow:
> {noformat}
>      |----|
>      v    |
> A -> B -> C -> D
> ^         |
> |---------|
> {noformat} 
> C has two separate backward streams to A and B.  The input ports of A and B 
> that C connects to will have a special attribute on how many window IDs ahead 
> the incoming windows should be treated as, and A and B will be responsible 
> for the initial data for such input ports.
> Another idea is to have C advance the window ID on its output ports and have 
> C generate the initial data on its output ports to A and B.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to