PetarVasiljevic-DB commented on code in PR #50921:
URL: https://github.com/apache/spark/pull/50921#discussion_r2197539031
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala:
##########
@@ -121,6 +123,112 @@ case class JDBCScanBuilder(
}
}
+ override def isOtherSideCompatibleForJoin(other: SupportsPushDownJoin):
Boolean = {
+ other.isInstanceOf[JDBCScanBuilder] &&
+ jdbcOptions.url == other.asInstanceOf[JDBCScanBuilder].jdbcOptions.url
Review Comment:
We can do that. I think it is better to be more strict in the beginning and
make the check less strict in the future than making the check stricter over
time as it could break someone.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##########
@@ -98,6 +100,142 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan]
with PredicateHelper {
filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder)
}
+ def pushDownJoin(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+ // Join can be attempted to be pushed down only if left and right side of
join are
+ // compatible (same data source, for example). Also, another requirement
is that if
+ // there are projections between Join and ScanBuilderHolder, these
projections need to be
+ // AttributeReferences. We could probably support Alias as well, but this
should be on
+ // TODO list.
+ // Alias can exist between Join and sHolder node because the query below
is not valid:
+ // SELECT * FROM
+ // (SELECT * FROM tbl t1 JOIN tbl2 t2) p
+ // JOIN
+ // (SELECT * FROM tbl t3 JOIN tbl3 t4) q
+ // ON p.t1.col = q.t3.col (this is not possible)
+ // It's because there are duplicated columns in both sides of top level
join and it's not
+ // possible to fully qualified the column names in condition. Therefore,
query should be
+ // rewritten so that each of the outputs of child joins are aliased, so
there would be a
+ // projection with aliases between top level join and scanBuilderHolder
(that has pushed
+ // child joins).
+ case node @ Join(
+ PhysicalOperation(
+ leftProjections,
+ Nil,
+ leftHolder @ ScanBuilderHolder(_, _, lBuilder: SupportsPushDownJoin)
+ ),
+ PhysicalOperation(
+ rightProjections,
+ Nil,
+ rightHolder @ ScanBuilderHolder(_, _, rBuilder: SupportsPushDownJoin)
+ ),
+ joinType,
+ condition,
+ _) if conf.dataSourceV2JoinPushdown &&
+ // We do not support pushing down anything besides AttributeReference.
+ leftProjections.forall(_.isInstanceOf[AttributeReference]) &&
+ rightProjections.forall(_.isInstanceOf[AttributeReference]) &&
+ // Cross joins are not supported because they increase the amount of
data.
+ condition.isDefined &&
+ // Joins on top of sampled tables are not supported
+ leftHolder.pushedSample.isEmpty &&
+ rightHolder.pushedSample.isEmpty &&
+ lBuilder.isOtherSideCompatibleForJoin(rBuilder) =>
+ val leftSideRequiredColumnNames =
getRequiredColumnNames(leftProjections, leftHolder)
+ val rightSideRequiredColumnNames =
getRequiredColumnNames(rightProjections, rightHolder)
+
+ // Alias the duplicated columns from left side of the join.
+ val leftSideRequiredColumnsWithAliases = leftSideRequiredColumnNames.map
{ name =>
+ val aliasName =
+ if (leftSideRequiredColumnNames.count(_ == name) > 1 ||
+ rightSideRequiredColumnNames.contains(name)) {
+ generateJoinOutputAlias(name)
+ } else {
+ null
+ }
+
+ new SupportsPushDownJoin.ColumnWithAlias(name, aliasName)
+ }
+
+ // Aliasing of duplicated columns in right side is done only if there
are duplicates in
+ // right side only. There won't be a conflict with left side columns
because they are
+ // already aliased.
+ val rightSideRequiredColumnsWithAliases =
rightSideRequiredColumnNames.map { name =>
+ val aliasName =
+ if (rightSideRequiredColumnNames.count(_ == name) > 1) {
+ generateJoinOutputAlias(name)
+ } else {
+ null
+ }
+
+ new SupportsPushDownJoin.ColumnWithAlias(name, aliasName)
+ }
+
+ // Create the AttributeMap that holds (Attribute -> Attribute with up to
date name) mapping.
+ val pushedJoinOutputMap = AttributeMap[Expression](
+ node.output.asInstanceOf[Seq[AttributeReference]]
Review Comment:
We don't. We can just use `node.output`
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##########
@@ -98,6 +100,142 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan]
with PredicateHelper {
filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder)
}
+ def pushDownJoin(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+ // Join can be attempted to be pushed down only if left and right side of
join are
+ // compatible (same data source, for example). Also, another requirement
is that if
+ // there are projections between Join and ScanBuilderHolder, these
projections need to be
+ // AttributeReferences. We could probably support Alias as well, but this
should be on
+ // TODO list.
+ // Alias can exist between Join and sHolder node because the query below
is not valid:
+ // SELECT * FROM
+ // (SELECT * FROM tbl t1 JOIN tbl2 t2) p
+ // JOIN
+ // (SELECT * FROM tbl t3 JOIN tbl3 t4) q
+ // ON p.t1.col = q.t3.col (this is not possible)
+ // It's because there are duplicated columns in both sides of top level
join and it's not
+ // possible to fully qualified the column names in condition. Therefore,
query should be
+ // rewritten so that each of the outputs of child joins are aliased, so
there would be a
+ // projection with aliases between top level join and scanBuilderHolder
(that has pushed
+ // child joins).
+ case node @ Join(
+ PhysicalOperation(
+ leftProjections,
+ Nil,
+ leftHolder @ ScanBuilderHolder(_, _, lBuilder: SupportsPushDownJoin)
+ ),
+ PhysicalOperation(
+ rightProjections,
+ Nil,
+ rightHolder @ ScanBuilderHolder(_, _, rBuilder: SupportsPushDownJoin)
+ ),
+ joinType,
+ condition,
+ _) if conf.dataSourceV2JoinPushdown &&
+ // We do not support pushing down anything besides AttributeReference.
+ leftProjections.forall(_.isInstanceOf[AttributeReference]) &&
+ rightProjections.forall(_.isInstanceOf[AttributeReference]) &&
+ // Cross joins are not supported because they increase the amount of
data.
+ condition.isDefined &&
+ // Joins on top of sampled tables are not supported
+ leftHolder.pushedSample.isEmpty &&
+ rightHolder.pushedSample.isEmpty &&
+ lBuilder.isOtherSideCompatibleForJoin(rBuilder) =>
+ val leftSideRequiredColumnNames =
getRequiredColumnNames(leftProjections, leftHolder)
+ val rightSideRequiredColumnNames =
getRequiredColumnNames(rightProjections, rightHolder)
+
+ // Alias the duplicated columns from left side of the join.
+ val leftSideRequiredColumnsWithAliases = leftSideRequiredColumnNames.map
{ name =>
+ val aliasName =
+ if (leftSideRequiredColumnNames.count(_ == name) > 1 ||
+ rightSideRequiredColumnNames.contains(name)) {
+ generateJoinOutputAlias(name)
+ } else {
+ null
+ }
+
+ new SupportsPushDownJoin.ColumnWithAlias(name, aliasName)
+ }
+
+ // Aliasing of duplicated columns in right side is done only if there
are duplicates in
+ // right side only. There won't be a conflict with left side columns
because they are
+ // already aliased.
+ val rightSideRequiredColumnsWithAliases =
rightSideRequiredColumnNames.map { name =>
+ val aliasName =
+ if (rightSideRequiredColumnNames.count(_ == name) > 1) {
+ generateJoinOutputAlias(name)
+ } else {
+ null
+ }
+
+ new SupportsPushDownJoin.ColumnWithAlias(name, aliasName)
+ }
+
+ // Create the AttributeMap that holds (Attribute -> Attribute with up to
date name) mapping.
+ val pushedJoinOutputMap = AttributeMap[Expression](
+ node.output.asInstanceOf[Seq[AttributeReference]]
+ .zip(leftSideRequiredColumnsWithAliases ++
rightSideRequiredColumnsWithAliases)
+ .collect {
+ case (attr, columnWithAlias) if columnWithAlias.alias() != null =>
+ (attr, attr.withName(columnWithAlias.alias()))
+ }
+ .toMap
+ )
+
+ // Reuse the previously calculated map to update the condition with
attributes
+ // with up-to-date names
+ val normalizedCondition = condition.map { e =>
+ DataSourceStrategy.normalizeExprs(
+ Seq(e),
+ (leftHolder.output ++ rightHolder.output).map { a =>
+ pushedJoinOutputMap.getOrElse(a,
a).asInstanceOf[AttributeReference]
+ }
+ ).head
+ }
+
+ val translatedCondition =
+ normalizedCondition.flatMap(DataSourceV2Strategy.translateFilterV2(_))
+ val translatedJoinType = DataSourceStrategy.translateJoinType(joinType)
+
+ if (translatedJoinType.isDefined &&
+ translatedCondition.isDefined &&
+ lBuilder.pushDownJoin(
+ rBuilder,
+ translatedJoinType.get,
+ leftSideRequiredColumnsWithAliases,
+ rightSideRequiredColumnsWithAliases,
+ translatedCondition.get)
+ ) {
+ leftHolder.joinedRelations = leftHolder.joinedRelations ++
rightHolder.joinedRelations
+ leftHolder.pushedPredicates = leftHolder.pushedPredicates ++
+ rightHolder.pushedPredicates :+ translatedCondition.get
+
+ leftHolder.output = node.output.asInstanceOf[Seq[AttributeReference]]
+ leftHolder.pushedJoinOutputMap = pushedJoinOutputMap
+
+ leftHolder
+ } else {
+ node
+ }
+ }
+
+ def generateJoinOutputAlias(name: String): String =
+ s"${name}_${java.util.UUID.randomUUID().toString.replace("-", "_")}"
Review Comment:
If we alias all, yes, we don't have the issue in that case
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##########
@@ -98,6 +100,142 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan]
with PredicateHelper {
filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder)
}
+ def pushDownJoin(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+ // Join can be attempted to be pushed down only if left and right side of
join are
+ // compatible (same data source, for example). Also, another requirement
is that if
+ // there are projections between Join and ScanBuilderHolder, these
projections need to be
+ // AttributeReferences. We could probably support Alias as well, but this
should be on
+ // TODO list.
+ // Alias can exist between Join and sHolder node because the query below
is not valid:
+ // SELECT * FROM
+ // (SELECT * FROM tbl t1 JOIN tbl2 t2) p
+ // JOIN
+ // (SELECT * FROM tbl t3 JOIN tbl3 t4) q
+ // ON p.t1.col = q.t3.col (this is not possible)
+ // It's because there are duplicated columns in both sides of top level
join and it's not
+ // possible to fully qualified the column names in condition. Therefore,
query should be
+ // rewritten so that each of the outputs of child joins are aliased, so
there would be a
+ // projection with aliases between top level join and scanBuilderHolder
(that has pushed
+ // child joins).
+ case node @ Join(
+ PhysicalOperation(
+ leftProjections,
+ Nil,
+ leftHolder @ ScanBuilderHolder(_, _, lBuilder: SupportsPushDownJoin)
+ ),
+ PhysicalOperation(
+ rightProjections,
+ Nil,
+ rightHolder @ ScanBuilderHolder(_, _, rBuilder: SupportsPushDownJoin)
+ ),
+ joinType,
+ condition,
+ _) if conf.dataSourceV2JoinPushdown &&
+ // We do not support pushing down anything besides AttributeReference.
+ leftProjections.forall(_.isInstanceOf[AttributeReference]) &&
+ rightProjections.forall(_.isInstanceOf[AttributeReference]) &&
+ // Cross joins are not supported because they increase the amount of
data.
+ condition.isDefined &&
+ // Joins on top of sampled tables are not supported
+ leftHolder.pushedSample.isEmpty &&
+ rightHolder.pushedSample.isEmpty &&
+ lBuilder.isOtherSideCompatibleForJoin(rBuilder) =>
+ val leftSideRequiredColumnNames =
getRequiredColumnNames(leftProjections, leftHolder)
+ val rightSideRequiredColumnNames =
getRequiredColumnNames(rightProjections, rightHolder)
+
+ // Alias the duplicated columns from left side of the join.
+ val leftSideRequiredColumnsWithAliases = leftSideRequiredColumnNames.map
{ name =>
+ val aliasName =
+ if (leftSideRequiredColumnNames.count(_ == name) > 1 ||
+ rightSideRequiredColumnNames.contains(name)) {
+ generateJoinOutputAlias(name)
+ } else {
+ null
+ }
+
+ new SupportsPushDownJoin.ColumnWithAlias(name, aliasName)
+ }
+
+ // Aliasing of duplicated columns in right side is done only if there
are duplicates in
+ // right side only. There won't be a conflict with left side columns
because they are
+ // already aliased.
+ val rightSideRequiredColumnsWithAliases =
rightSideRequiredColumnNames.map { name =>
+ val aliasName =
+ if (rightSideRequiredColumnNames.count(_ == name) > 1) {
+ generateJoinOutputAlias(name)
+ } else {
+ null
+ }
+
+ new SupportsPushDownJoin.ColumnWithAlias(name, aliasName)
+ }
+
+ // Create the AttributeMap that holds (Attribute -> Attribute with up to
date name) mapping.
+ val pushedJoinOutputMap = AttributeMap[Expression](
+ node.output.asInstanceOf[Seq[AttributeReference]]
+ .zip(leftSideRequiredColumnsWithAliases ++
rightSideRequiredColumnsWithAliases)
+ .collect {
+ case (attr, columnWithAlias) if columnWithAlias.alias() != null =>
+ (attr, attr.withName(columnWithAlias.alias()))
+ }
+ .toMap
+ )
+
+ // Reuse the previously calculated map to update the condition with
attributes
+ // with up-to-date names
+ val normalizedCondition = condition.map { e =>
+ DataSourceStrategy.normalizeExprs(
+ Seq(e),
+ (leftHolder.output ++ rightHolder.output).map { a =>
+ pushedJoinOutputMap.getOrElse(a,
a).asInstanceOf[AttributeReference]
+ }
+ ).head
+ }
+
+ val translatedCondition =
+ normalizedCondition.flatMap(DataSourceV2Strategy.translateFilterV2(_))
+ val translatedJoinType = DataSourceStrategy.translateJoinType(joinType)
+
+ if (translatedJoinType.isDefined &&
+ translatedCondition.isDefined &&
+ lBuilder.pushDownJoin(
+ rBuilder,
+ translatedJoinType.get,
+ leftSideRequiredColumnsWithAliases,
+ rightSideRequiredColumnsWithAliases,
+ translatedCondition.get)
+ ) {
+ leftHolder.joinedRelations = leftHolder.joinedRelations ++
rightHolder.joinedRelations
+ leftHolder.pushedPredicates = leftHolder.pushedPredicates ++
+ rightHolder.pushedPredicates :+ translatedCondition.get
+
+ leftHolder.output = node.output.asInstanceOf[Seq[AttributeReference]]
+ leftHolder.pushedJoinOutputMap = pushedJoinOutputMap
+
+ leftHolder
+ } else {
+ node
+ }
+ }
+
+ def generateJoinOutputAlias(name: String): String =
+ s"${name}_${java.util.UUID.randomUUID().toString.replace("-", "_")}"
Review Comment:
@cloud-fan what do you think about reverting it back to aliasing all the
columns instead of duplicated ones only?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##########
@@ -98,6 +100,132 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan]
with PredicateHelper {
filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder)
}
+ def pushDownJoin(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+ // Join can be attempted to be pushed down only if left and right side of
join are
+ // compatible (same data source, for example). Also, another requirement
is that if
+ // there are projections between Join and ScanBuilderHolder, these
projections need to be
+ // AttributeReferences. We could probably support Alias as well, but this
should be on
+ // TODO list.
+ // Alias can exist between Join and sHolder node because the query below
is not valid:
+ // SELECT * FROM
+ // (SELECT * FROM tbl t1 JOIN tbl2 t2) p
+ // JOIN
+ // (SELECT * FROM tbl t3 JOIN tbl3 t4) q
+ // ON p.t1.col = q.t3.col (this is not possible)
+ // It's because there are 2 same tables in both sides of top level join
and it not possible
+ // to fully qualified the column names in condition. Therefore, query
should be rewritten so
+ // that each of the outputs of child joins are aliased, so there would be
a projection
+ // with aliases between top level join and scanBuilderHolder (that has
pushed child joins).
+ case node @ Join(
+ PhysicalOperation(
+ leftProjections,
+ Nil,
+ leftHolder @ ScanBuilderHolder(_, _, lBuilder: SupportsPushDownJoin)
+ ),
+ PhysicalOperation(
+ rightProjections,
+ Nil,
+ rightHolder @ ScanBuilderHolder(_, _, rBuilder: SupportsPushDownJoin)
+ ),
+ joinType,
+ condition,
+ _) if conf.dataSourceV2JoinPushdown &&
+ // We do not support pushing down anything besides AttributeReference.
+ leftProjections.forall(_.isInstanceOf[AttributeReference]) &&
+ rightProjections.forall(_.isInstanceOf[AttributeReference]) &&
+ // Cross joins are not supported because they increase the amount of
data.
+ condition.isDefined &&
+ // Joins on top of sampled tables are not supported
+ leftHolder.pushedSample.isEmpty &&
+ rightHolder.pushedSample.isEmpty &&
+ lBuilder.isOtherSideCompatibleForJoin(rBuilder) =>
+
+ // projections' names are maybe not up to date if the joins have been
previously pushed down.
+ // For this reason, we need to use pushedJoinOutputMap to get up to date
names.
+ def getRequiredColumnNames(
+ projections: Seq[NamedExpression],
+ sHolder: ScanBuilderHolder): Array[String] = {
+ val normalizedProjections = DataSourceStrategy.normalizeExprs(
+ projections,
+ sHolder.output.map { a =>
+ sHolder.pushedJoinOutputMap.getOrElse(a,
a).asInstanceOf[AttributeReference]
+ }
+ ).asInstanceOf[Seq[AttributeReference]]
+
+ normalizedProjections.map(_.name).toArray
+ }
+
+ def generateJoinOutputAlias(name: String): String =
+ s"${name}_${java.util.UUID.randomUUID().toString.replace("-", "_")}"
+
+ val leftSideRequiredColumnNames =
getRequiredColumnNames(leftProjections, leftHolder)
+ val rightSideRequiredColumnNames =
getRequiredColumnNames(rightProjections, rightHolder)
+
+ // Alias the duplicated columns from left side of the join.
+ val leftSideRequiredColumnsWithAliases = leftSideRequiredColumnNames.map
{ name =>
+ val aliasName = if (rightSideRequiredColumnNames.contains(name)) {
+ generateJoinOutputAlias(name)
+ } else {
+ null
+ }
+
+ new SupportsPushDownJoin.ColumnWithAlias(name, aliasName)
+ }
+
+ // Aliasing of duplicated columns in right side of the join is not
needed because the
+ // the conflicts are resolved by aliasing the left side.
+ val rightSideRequiredColumnsWithAliases =
rightSideRequiredColumnNames.map { field =>
+ new SupportsPushDownJoin.ColumnWithAlias(field, null)
+ }
+
+ // Create the AttributeMap that holds (Attribute -> Attribute with up to
date name) mapping.
+ val pushedJoinOutputMap = AttributeMap[Expression](
Review Comment:
You have the type which is `AttributeMap`. Let's leave it as it is.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]