richardstartin commented on a change in pull request #7450: URL: https://github.com/apache/pinot/pull/7450#discussion_r711524613
########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java ########## @@ -71,30 +70,19 @@ public String getOperatorName() { @Override protected void processSegments(int threadIndex) { - for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) { + for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) { Operator<IntermediateResultsBlock> operator = _operators.get(operatorIndex); - try { - IntermediateResultsBlock resultsBlock; - while ((resultsBlock = operator.nextBlock()) != null) { - Collection<Object[]> rows = resultsBlock.getSelectionResult(); - assert rows != null; - long numRowsCollected = _numRowsCollected.addAndGet(rows.size()); - _blockingQueue.offer(resultsBlock); - if (numRowsCollected >= _limit) { - return; - } + IntermediateResultsBlock resultsBlock; + while ((resultsBlock = operator.nextBlock()) != null) { + Collection<Object[]> rows = resultsBlock.getSelectionResult(); + assert rows != null; + long numRowsCollected = _numRowsCollected.addAndGet(rows.size()); + _blockingQueue.offer(resultsBlock); Review comment: This logic is wrong - `LinkedBlockingQueue.offer` can return false, but is used as if the offer must succeed in several places, here, the number of rows collected should only be increased when the offer succeeds (though I suspect all of this code is written under the conception that the offers can't fail). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org