LakshSingla commented on code in PR #14196:
URL: https://github.com/apache/druid/pull/14196#discussion_r1246174349
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessor.java:
##########
@@ -309,30 +386,43 @@ private ReturnOrAwait<Long> nextAwait()
final IntSet awaitSet = new IntOpenHashSet();
int trackerAtLimit = -1;
+ // Add all trackers that "needsMoreData" to awaitSet.
for (int i = 0; i < inputChannels.size(); i++) {
final Tracker tracker = trackers.get(i);
- if (tracker.needsMoreData()) {
- if (tracker.totalBytesBuffered() <
Limits.MAX_BUFFERED_BYTES_FOR_SORT_MERGE_JOIN) {
+ if (tracker.needsMoreDataForCurrentCursor()) {
+ if (tracker.canBufferMoreFrames()) {
awaitSet.add(i);
} else if (trackerAtLimit < 0) {
trackerAtLimit = i;
}
}
}
- if (awaitSet.isEmpty() && trackerAtLimit > 0) {
+ if (awaitSet.isEmpty()) {
+ // No tracker reported that it "needsMoreData" to read the current
cursor. However, we may still need to read
+ // more data to have a complete set for the current mark.
+ for (int i = 0; i < inputChannels.size(); i++) {
+ final Tracker tracker = trackers.get(i);
+ if (!tracker.hasCompleteSetForMark()) {
+ if (tracker.canBufferMoreFrames()) {
+ awaitSet.add(i);
+ } else if (trackerAtLimit < 0) {
+ trackerAtLimit = i;
+ }
+ }
+ }
+ }
+
+ if (awaitSet.isEmpty() && trackerAtLimit >= 0) {
// All trackers that need more data are at their max buffered bytes
limit. Generate a nice exception.
final Tracker tracker = trackers.get(trackerAtLimit);
- if (tracker.totalBytesBuffered() >
Limits.MAX_BUFFERED_BYTES_FOR_SORT_MERGE_JOIN) {
- // Generate a nice exception.
- throw new MSQException(
- new TooManyRowsWithSameKeyFault(
- tracker.readMarkKey(),
- tracker.totalBytesBuffered(),
- Limits.MAX_BUFFERED_BYTES_FOR_SORT_MERGE_JOIN
- )
- );
- }
+ throw new MSQException(
+ new TooManyRowsWithSameKeyFault(
Review Comment:
In the spirit of the improved `DruidExceptions`, the error message of
`TooManyRowsWithSameKeyFault` might not make sense to the USER running the
query.
We should mention alternatives for the user:
1. Break up the job into smaller chunks
2. Increase heap size for the job
3. Use broadcast algorithm (maybe?)
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessor.java:
##########
@@ -353,7 +443,9 @@ private boolean allTrackersAreAtEnd()
}
/**
- * Compares the marked rows of the two {@link #trackers}.
+ * Compares the marked rows of the two {@link #trackers}. This method
returns 0 if both sides are null, even
Review Comment:
nit: We should also mention that we emit -1 if we should consider the LEFT
row and 1 if we should consider the RIGHT row. It might be a good refactor that
instead of `int` we use an ENUM to denote the three outcomes of `compareMarks`,
since the semantics differ slightly from an actual comparison algorithm
according to my understanding.
Also, comparison (eg Java `compareTo`) returns something like (left -right),
while here we also consider ascending, and descending nature of the columns.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessor.java:
##########
@@ -533,6 +629,11 @@ public long totalBytesBuffered()
return bytes;
}
+ public boolean canBufferMoreFrames()
Review Comment:
Can you please add a Javadoc for the method? I am unable to get why
`holders.size() <= 1` is required.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessor.java:
##########
@@ -166,10 +168,10 @@ public List<WritableFrameChannel> outputChannels()
@Override
public ReturnOrAwait<Long> runIncrementally(IntSet readableInputs) throws
IOException
{
- // Fetch enough frames such that each tracker has one readable row.
+ // Fetch enough frames such that each tracker has one readable row (or is
done).
for (int i = 0; i < inputChannels.size(); i++) {
final Tracker tracker = trackers.get(i);
- if (tracker.isAtEndOfPushedData() && !pushNextFrame(i)) {
+ if (tracker.needsMoreDataForCurrentCursor() && !pushNextFrame(i)) {
Review Comment:
Was this a regression in the original code that might have caused the
processor to get stuck in some cases? (In all cases if one side of the join is
empty)
(I am trying to understand the code and this change makes sense, so I was
wondering about the impact of the change. If so, it's good that we caught it
before we went into a debugging maze).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]