gianm commented on code in PR #14196:
URL: https://github.com/apache/druid/pull/14196#discussion_r1246855498


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessor.java:
##########
@@ -309,30 +386,43 @@ private ReturnOrAwait<Long> nextAwait()
     final IntSet awaitSet = new IntOpenHashSet();
     int trackerAtLimit = -1;
 
+    // Add all trackers that "needsMoreData" to awaitSet.
     for (int i = 0; i < inputChannels.size(); i++) {
       final Tracker tracker = trackers.get(i);
-      if (tracker.needsMoreData()) {
-        if (tracker.totalBytesBuffered() < 
Limits.MAX_BUFFERED_BYTES_FOR_SORT_MERGE_JOIN) {
+      if (tracker.needsMoreDataForCurrentCursor()) {
+        if (tracker.canBufferMoreFrames()) {
           awaitSet.add(i);
         } else if (trackerAtLimit < 0) {
           trackerAtLimit = i;
         }
       }
     }
 
-    if (awaitSet.isEmpty() && trackerAtLimit > 0) {
+    if (awaitSet.isEmpty()) {
+      // No tracker reported that it "needsMoreData" to read the current 
cursor. However, we may still need to read
+      // more data to have a complete set for the current mark.
+      for (int i = 0; i < inputChannels.size(); i++) {
+        final Tracker tracker = trackers.get(i);
+        if (!tracker.hasCompleteSetForMark()) {
+          if (tracker.canBufferMoreFrames()) {
+            awaitSet.add(i);
+          } else if (trackerAtLimit < 0) {
+            trackerAtLimit = i;
+          }
+        }
+      }
+    }
+
+    if (awaitSet.isEmpty() && trackerAtLimit >= 0) {
       // All trackers that need more data are at their max buffered bytes 
limit. Generate a nice exception.
       final Tracker tracker = trackers.get(trackerAtLimit);
-      if (tracker.totalBytesBuffered() > 
Limits.MAX_BUFFERED_BYTES_FOR_SORT_MERGE_JOIN) {
-        // Generate a nice exception.
-        throw new MSQException(
-            new TooManyRowsWithSameKeyFault(
-                tracker.readMarkKey(),
-                tracker.totalBytesBuffered(),
-                Limits.MAX_BUFFERED_BYTES_FOR_SORT_MERGE_JOIN
-            )
-        );
-      }
+      throw new MSQException(
+          new TooManyRowsWithSameKeyFault(

Review Comment:
   I added:
   
   > Try increasing heap memory available to workers, or adjusting your query 
to process fewer rows with this key.
   
   I also added some code to actually make the max buffered bytes limit scale 
with heap size. Previously it was hard-coded at 10MB.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to