LakshSingla commented on code in PR #14196:
URL: https://github.com/apache/druid/pull/14196#discussion_r1246174349


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessor.java:
##########
@@ -309,30 +386,43 @@ private ReturnOrAwait<Long> nextAwait()
     final IntSet awaitSet = new IntOpenHashSet();
     int trackerAtLimit = -1;
 
+    // Add all trackers that "needsMoreData" to awaitSet.
     for (int i = 0; i < inputChannels.size(); i++) {
       final Tracker tracker = trackers.get(i);
-      if (tracker.needsMoreData()) {
-        if (tracker.totalBytesBuffered() < 
Limits.MAX_BUFFERED_BYTES_FOR_SORT_MERGE_JOIN) {
+      if (tracker.needsMoreDataForCurrentCursor()) {
+        if (tracker.canBufferMoreFrames()) {
           awaitSet.add(i);
         } else if (trackerAtLimit < 0) {
           trackerAtLimit = i;
         }
       }
     }
 
-    if (awaitSet.isEmpty() && trackerAtLimit > 0) {
+    if (awaitSet.isEmpty()) {
+      // No tracker reported that it "needsMoreData" to read the current 
cursor. However, we may still need to read
+      // more data to have a complete set for the current mark.
+      for (int i = 0; i < inputChannels.size(); i++) {
+        final Tracker tracker = trackers.get(i);
+        if (!tracker.hasCompleteSetForMark()) {
+          if (tracker.canBufferMoreFrames()) {
+            awaitSet.add(i);
+          } else if (trackerAtLimit < 0) {
+            trackerAtLimit = i;
+          }
+        }
+      }
+    }
+
+    if (awaitSet.isEmpty() && trackerAtLimit >= 0) {
       // All trackers that need more data are at their max buffered bytes 
limit. Generate a nice exception.
       final Tracker tracker = trackers.get(trackerAtLimit);
-      if (tracker.totalBytesBuffered() > 
Limits.MAX_BUFFERED_BYTES_FOR_SORT_MERGE_JOIN) {
-        // Generate a nice exception.
-        throw new MSQException(
-            new TooManyRowsWithSameKeyFault(
-                tracker.readMarkKey(),
-                tracker.totalBytesBuffered(),
-                Limits.MAX_BUFFERED_BYTES_FOR_SORT_MERGE_JOIN
-            )
-        );
-      }
+      throw new MSQException(
+          new TooManyRowsWithSameKeyFault(

Review Comment:
   In the spirit of the improved `DruidExceptions`, the error message of 
`TooManyRowsWithSameKeyFault` might not make sense to the USER running the 
query.
   We should mention alternatives for the user:
   1. Break up the job into smaller chunks
   2. Increase heap size for the job
   3. Use broadcast algorithm (maybe?)



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessor.java:
##########
@@ -353,7 +443,9 @@ private boolean allTrackersAreAtEnd()
   }
 
   /**
-   * Compares the marked rows of the two {@link #trackers}.
+   * Compares the marked rows of the two {@link #trackers}. This method 
returns 0 if both sides are null, even

Review Comment:
   nit: We should also mention that we emit -1 if we should consider the LEFT 
row and 1 if we should consider the RIGHT row. It might be a good refactor that 
instead of `int` we use an ENUM to denote the three outcomes of `compareMarks`, 
since the semantics differ slightly from an actual comparison algorithm 
according to my understanding. 
   
   Also, comparison (eg Java `compareTo`) returns something like (left -right), 
while here we also consider ascending, and descending nature of the columns.
   
   
   



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessor.java:
##########
@@ -533,6 +629,11 @@ public long totalBytesBuffered()
       return bytes;
     }
 
+    public boolean canBufferMoreFrames()

Review Comment:
   Can you please add a Javadoc for the method? I am unable to get why 
`holders.size() <= 1` is required.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessor.java:
##########
@@ -166,10 +168,10 @@ public List<WritableFrameChannel> outputChannels()
   @Override
   public ReturnOrAwait<Long> runIncrementally(IntSet readableInputs) throws 
IOException
   {
-    // Fetch enough frames such that each tracker has one readable row.
+    // Fetch enough frames such that each tracker has one readable row (or is 
done).
     for (int i = 0; i < inputChannels.size(); i++) {
       final Tracker tracker = trackers.get(i);
-      if (tracker.isAtEndOfPushedData() && !pushNextFrame(i)) {
+      if (tracker.needsMoreDataForCurrentCursor() && !pushNextFrame(i)) {

Review Comment:
   Was this a regression in the original code that might have caused the 
processor to get stuck in some cases? (In all cases if one side of the join is 
empty)
   
   (I am trying to understand the code and this change makes sense, so I was 
wondering about the impact of the change. If so, it's good that we caught it 
before we went into a debugging maze). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to