richardstartin commented on a change in pull request #7450:
URL: https://github.com/apache/pinot/pull/7450#discussion_r711524613



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
##########
@@ -71,30 +70,19 @@ public String getOperatorName() {
 
   @Override
   protected void processSegments(int threadIndex) {
-    for (int operatorIndex = threadIndex; operatorIndex < _numOperators; 
operatorIndex += _numTasks) {
+    for (int operatorIndex = threadIndex; operatorIndex < _numOperators; 
operatorIndex += _numThreads) {
       Operator<IntermediateResultsBlock> operator = 
_operators.get(operatorIndex);
-      try {
-        IntermediateResultsBlock resultsBlock;
-        while ((resultsBlock = operator.nextBlock()) != null) {
-          Collection<Object[]> rows = resultsBlock.getSelectionResult();
-          assert rows != null;
-          long numRowsCollected = _numRowsCollected.addAndGet(rows.size());
-          _blockingQueue.offer(resultsBlock);
-          if (numRowsCollected >= _limit) {
-            return;
-          }
+      IntermediateResultsBlock resultsBlock;
+      while ((resultsBlock = operator.nextBlock()) != null) {
+        Collection<Object[]> rows = resultsBlock.getSelectionResult();
+        assert rows != null;
+        long numRowsCollected = _numRowsCollected.addAndGet(rows.size());
+        _blockingQueue.offer(resultsBlock);

Review comment:
       This logic is wrong - `LinkedBlockingQueue.offer` can return false, but 
is used as if the offer must succeed in several places, here, the number of 
rows collected should only be increased when the offer succeeds (though I 
suspect all of this code is written under the conception that the offers can't 
fail).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to