cloud-fan commented on a change in pull request #32210:
URL: https://github.com/apache/spark/pull/32210#discussion_r619302789



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -475,18 +501,89 @@ private[joins] object UnsafeHashedRelation {
           key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
           row.getBaseObject, row.getBaseOffset, row.getSizeInBytes)
         if (!success) {
-          binaryMap.free()
-          throw 
QueryExecutionErrors.cannotAcquireMemoryToBuildUnsafeHashedRelationError()
+          if (allowsFallbackWithNoMemory) {
+            return new UnfinishedUnsafeHashedRelation(numFields, binaryMap, 
row)
+          } else {
+            // Clean up map and throw exception
+            binaryMap.free()
+            throw 
QueryExecutionErrors.cannotAcquireMemoryToBuildUnsafeHashedRelationError()
+          }
         }
       } else if (isNullAware) {
         return HashedRelationWithAllNullKeys
       }
+      i += 1
     }
 
     new UnsafeHashedRelation(key.size, numFields, binaryMap)
   }
 }
 
+/**
+ * An unfinished version of [[UnsafeHashedRelation]].
+ * This is intended to use in sort-based fallback of [[ShuffledHashJoinExec]],
+ * when there is no enough memory to build [[UnsafeHashedRelation]].
+ *
+ * @param numFields Number of fields in each row.
+ * @param binaryMap Backed [[BytesToBytesMap]] to hold keys and rows.
+ * @param pendingRow The row which cannot be added to `binaryMap` due to 
memory limit.
+ */
+private[joins] class UnfinishedUnsafeHashedRelation(
+    private val numFields: Int,
+    private val binaryMap: BytesToBytesMap,
+    private val pendingRow: UnsafeRow)
+  extends HashedRelation {
+
+  override def destructiveValues(): Iterator[InternalRow] = new 
Iterator[InternalRow] {

Review comment:
       Do we really need to define this function in the base class? Seems the 
caller side can just patter match this relation and call `destructiveValues`. 
It's also safer to do so, as `destructiveValues` in other relations just fail.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to