[ https://issues.apache.org/jira/browse/APEXCORE-60?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15083780#comment-15083780 ]
ASF GitHub Bot commented on APEXCORE-60: ---------------------------------------- Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/incubator-apex-core/pull/185#discussion_r48895870 --- Diff: engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java --- @@ -212,30 +225,60 @@ public final void run() long spinMillis = context.getValue(OperatorContext.SPIN_MILLIS); final boolean handleIdleTime = operator instanceof IdleTimeHandler; int totalQueues = inputs.size(); + int regularQueues = totalQueues; + // regularQueues is the number of queues that are not connected to a DelayOperator + for (String portName : inputs.keySet()) { + if (isInputPortConnectedToDelayOperator(portName)) { + regularQueues--; + } + } - ArrayList<SweepableReservoir> activeQueues = new ArrayList<SweepableReservoir>(); - activeQueues.addAll(inputs.values()); + ArrayList<Map.Entry<String, SweepableReservoir>> activeQueues = new ArrayList<>(); + activeQueues.addAll(inputs.entrySet()); int expectingBeginWindow = activeQueues.size(); int receivedEndWindow = 0; + long firstWindowId = -1; TupleTracker tracker; LinkedList<TupleTracker> resetTupleTracker = new LinkedList<TupleTracker>(); - try { do { - Iterator<SweepableReservoir> buffers = activeQueues.iterator(); + Iterator<Map.Entry<String, SweepableReservoir>> buffers = activeQueues.iterator(); activequeue: while (buffers.hasNext()) { - SweepableReservoir activePort = buffers.next(); + Map.Entry<String, SweepableReservoir> activePortEntry = buffers.next(); + SweepableReservoir activePort = activePortEntry.getValue(); Tuple t = activePort.sweep(); + boolean needResetWindow = false; if (t != null) { + boolean delay = (operator instanceof Operator.DelayOperator); + long windowAhead = 0; + if (delay) { + windowAhead = WindowGenerator.getAheadWindowId(t.getWindowId(), firstWindowMillis, windowWidthMillis, 1); + if (WindowGenerator.getBaseSecondsFromWindowId(windowAhead) > t.getBaseSeconds()) { --- End diff -- fixed > Iterative processing support > ---------------------------- > > Key: APEXCORE-60 > URL: https://issues.apache.org/jira/browse/APEXCORE-60 > Project: Apache Apex Core > Issue Type: New Feature > Reporter: David Yan > Assignee: David Yan > Labels: roadmap > Fix For: 3.3.0 > > > We would like to support iterative processing by introducing cycles in the > graph (known as DAG now, but no longer if we support iterative processing). > Initial idea is as follow: > {noformat} > |----| > v | > A -> B -> C -> D > ^ | > |---------| > {noformat} > C has two separate backward streams to A and B. The input ports of A and B > that C connects to will have a special attribute on how many window IDs ahead > the incoming windows should be treated as, and A and B will be responsible > for the initial data for such input ports. > Another idea is to have C advance the window ID on its output ports and have > C generate the initial data on its output ports to A and B. -- This message was sent by Atlassian JIRA (v6.3.4#6332)