cloud-fan commented on code in PR #50921:
URL: https://github.com/apache/spark/pull/50921#discussion_r2097069114


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##########
@@ -356,6 +458,26 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
       Project(projectList, scanRelation)
   }
 
+  def buildScanWithPushedJoin(plan: LogicalPlan): LogicalPlan = plan.transform 
{
+    case holder: ScanBuilderHolder if holder.isJoinPushed && 
!holder.isStreaming =>
+      val scan = holder.builder.build()
+      val realOutput = toAttributes(scan.readSchema())
+      assert(realOutput.length == holder.output.length,
+        "The data source returns unexpected number of columns")
+      val wrappedScan = getWrappedScan(scan, holder)
+      val scanRelation = DataSourceV2ScanRelation(holder.relation, 
wrappedScan, realOutput)
+
+      // When join is pushed down, the output of ScanBuilderHolder is going to 
be, for example,
+      // subquery_2_col_0#0, subquery_2_col_1#1, subquery_2_col_2#2.
+      // We should revert these names back to original names. For example,
+      // SALARY#0, NAME#1, DEPT#1. This is done by adding projection with 
appropriate aliases.
+      val projectList = realOutput.zip(holder.output).map { case (a1, a2) =>
+        val originalName = holder.exprIdToOriginalName(a2.exprId)
+        Alias(a1, originalName)(a2.exprId)

Review Comment:
   is `originalName` always `a2.name`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to