This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new e4fe9baec [GLUTEN-5136][VL] Duplicated output from Spark-to-Velox 
broadcast relation conversion (#5141)
e4fe9baec is described below

commit e4fe9baeccde07e2938d5f186151c43591e91720
Author: Hongze Zhang <hongze.zh...@intel.com>
AuthorDate: Wed Mar 27 12:54:29 2024 +0800

    [GLUTEN-5136][VL] Duplicated output from Spark-to-Velox broadcast relation 
conversion (#5141)
---
 .../apache/spark/sql/execution/BroadcastUtils.scala    | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala
index a0f28c5ab..ad7694ea2 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala
@@ -26,7 +26,7 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, 
BroadcastPartitioning, IdentityBroadcastMode, Partitioning}
-import org.apache.spark.sql.execution.joins.{HashedRelation, 
HashedRelationBroadcastMode}
+import org.apache.spark.sql.execution.joins.{HashedRelation, 
HashedRelationBroadcastMode, LongHashedRelation}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.TaskResources
@@ -96,9 +96,8 @@ object BroadcastUtils {
         // HashedRelation to ColumnarBuildSideRelation.
         val fromBroadcast = from.asInstanceOf[Broadcast[HashedRelation]]
         val fromRelation = fromBroadcast.value.asReadOnlyCopy()
-        val keys = fromRelation.keys()
         val toRelation = TaskResources.runUnsafe {
-          val batchItr: Iterator[ColumnarBatch] = fn(keys.flatMap(key => 
fromRelation.get(key)))
+          val batchItr: Iterator[ColumnarBatch] = 
fn(reconstructRows(fromRelation))
           val serialized: Array[Array[Byte]] = serializeStream(batchItr) match 
{
             case ColumnarBatchSerializeResult.EMPTY =>
               Array()
@@ -170,4 +169,17 @@ object BroadcastUtils {
       }
     serializeResult
   }
+
+  private def reconstructRows(relation: HashedRelation): Iterator[InternalRow] 
= {
+    // It seems that LongHashedRelation and UnsafeHashedRelation don't follow 
the same
+    //  criteria while getting values from them.
+    // Should review the internals of this part of code.
+    relation match {
+      case relation: LongHashedRelation if relation.keyIsUnique =>
+        relation.keys().map(k => relation.getValue(k))
+      case relation: LongHashedRelation if !relation.keyIsUnique =>
+        relation.keys().flatMap(k => relation.get(k))
+      case other => other.valuesWithKeyIndex().map(_.getValue)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org
For additional commands, e-mail: commits-h...@gluten.apache.org

Reply via email to