[
https://issues.apache.org/jira/browse/KYLIN-4925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17347441#comment-17347441
]
ASF GitHub Bot commented on KYLIN-4925:
---------------------------------------
zzcclp commented on a change in pull request #1601:
URL: https://github.com/apache/kylin/pull/1601#discussion_r635062155
##########
File path:
kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinJoinSelection.scala
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.spark.sql.execution
+
+import javax.annotation.concurrent.GuardedBy
+import org.apache.kylin.common.KylinConfig
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{PredicateHelper, RowOrdering}
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight,
BuildSide}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.{JoinHint, LogicalPlan}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.{SparkSession, Strategy}
+
+/**
+ * Select the proper physical plan for join based on joining keys and size of
logical plan.
+ *
+ * At first, uses the [[ExtractEquiJoinKeys]] pattern to find joins where at
least some of the
+ * predicates can be evaluated by matching join keys. If found, join
implementations are chosen
+ * with the following precedence:
+ *
+ * - Broadcast hash join (BHJ):
+ * BHJ is not supported for full outer join. For right outer join, we only can
broadcast the
+ * left side. For left outer, left semi, left anti and the internal join type
ExistenceJoin,
+ * we only can broadcast the right side. For inner like join, we can broadcast
both sides.
+ * Normally, BHJ can perform faster than the other join algorithms when the
broadcast side is
+ * small. However, broadcasting tables is a network-intensive operation.
It could cause OOM
+ * or perform worse than the other join algorithms, especially when the
build/broadcast side
+ * is big.
+ *
+ * For the supported cases, users can specify the broadcast hint (e.g. the
user applied the
+ * [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame) and
session-based
+ * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold to adjust whether BHJ is
used and
+ * which join side is broadcast.
+ *
+ * 1) Broadcast the join side with the broadcast hint, even if the size is
larger than
+ * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]]. If both sides have the hint (only
when the type
+ * is inner like join), the side with a smaller estimated physical size will
be broadcast.
+ * 2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold and
broadcast the side
+ * whose estimated physical size is smaller than the threshold. If both sides
are below the
+ * threshold, broadcast the smaller side. If neither is smaller, BHJ is not
used.
+ *
+ * - Shuffle hash join: if the average size of a single partition is small
enough to build a hash
+ * table.
+ *
+ * - Sort merge: if the matching join keys are sortable.
+ *
+ * If there is no joining keys, Join implementations are chosen with the
following precedence:
+ * - BroadcastNestedLoopJoin (BNLJ):
+ * BNLJ supports all the join types but the impl is OPTIMIZED for the
following scenarios:
+ * For right outer join, the left side is broadcast. For left outer, left
semi, left anti
+ * and the internal join type ExistenceJoin, the right side is broadcast. For
inner like
+ * joins, either side is broadcast.
+ *
+ * Like BHJ, users still can specify the broadcast hint and session-based
+ * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold to impact which side is
broadcast.
+ *
+ * 1) Broadcast the join side with the broadcast hint, even if the size is
larger than
+ * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]]. If both sides have the hint
(i.e., just for
+ * inner-like join), the side with a smaller estimated physical size will be
broadcast.
+ * 2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold and
broadcast the side
+ * whose estimated physical size is smaller than the threshold. If both sides
are below the
+ * threshold, broadcast the smaller side. If neither is smaller, BNLJ is not
used.
+ *
+ * - CartesianProduct: for inner like join, CartesianProduct is the fallback
option.
+ *
+ * - BroadcastNestedLoopJoin (BNLJ):
+ * For the other join types, BNLJ is the fallback option. Here, we just pick
the broadcast
+ * side with the broadcast hint. If neither side has a hint, we broadcast the
side with
+ * the smaller estimated physical size.
+ */
+case class KylinJoinSelection(session: SparkSession) extends Strategy with
PredicateHelper with Logging {
+
+ val conf: SQLConf = session.sessionState.conf
+
+ /**
+ * Matches a plan whose output should be small enough to be used in
broadcast join.
+ */
+ private def canBroadcast(plan: LogicalPlan): Boolean = {
+ val sizeInBytes = plan.stats.sizeInBytes
+ sizeInBytes >= 0 && sizeInBytes <= conf.autoBroadcastJoinThreshold &&
JoinMemoryManager.acquireMemory(sizeInBytes.toLong)
+ }
+
+ /**
+ * Matches a plan whose single partition should be small enough to build a
hash table.
+ *
+ * Note: this assume that the number of partition is fixed, requires
additional work if it's
+ * dynamic.
+ */
+ private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
+ plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold *
conf.numShufflePartitions
+ }
+
+ /**
+ * Returns whether plan a is much smaller (3X) than plan b.
+ *
+ * The cost to build hash map is higher than sorting, we should only build
hash map on a table
+ * that is much smaller than other one. Since we does not have the statistic
for number of rows,
+ * use the size of bytes here as estimation.
+ */
+ private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
+ a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
+ }
+
+ private def canBuildRight(joinType: JoinType): Boolean = joinType match {
+ case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin =>
true
+ case _ => false
+ }
+
+ private def canBuildLeft(joinType: JoinType): Boolean = joinType match {
+ case _: InnerLike | RightOuter => true
+ case _ => false
+ }
+
+ private def broadcastSide(
+ canBuildLeft: Boolean,
+ canBuildRight: Boolean,
+ left: LogicalPlan,
+ right: LogicalPlan): BuildSide = {
+
+ def smallerSide =
+ if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else
BuildLeft
+
+ if (canBuildRight && canBuildLeft) {
+ // Broadcast smaller side base on its estimated physical size
+ // if both sides have broadcast hint
+ smallerSide
+ } else if (canBuildRight) {
+ BuildRight
+ } else if (canBuildLeft) {
+ BuildLeft
+ } else {
+ // for the last default broadcast nested loop join
+ smallerSide
+ }
+ }
+
+ private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan,
right: LogicalPlan, hint: JoinHint)
+ : Boolean = {
+// val buildLeft = canBuildLeft(joinType) &&
logical.BROADCAST.equals(hint.leftHint.get.strategy.get)
+// val buildRight = canBuildRight(joinType) &&
logical.BROADCAST.equals(hint.rightHint.get.strategy.get)
+// buildLeft || buildRight
+ false
Review comment:
Why comments these lines and return false directly ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Use Spark 3 as build and query engine for Kylin 4
> -------------------------------------------------
>
> Key: KYLIN-4925
> URL: https://issues.apache.org/jira/browse/KYLIN-4925
> Project: Kylin
> Issue Type: New Feature
> Components: Job Engine, Query Engine
> Reporter: Congling Xia
> Assignee: Congling Xia
> Priority: Major
> Fix For: v4.0.0-GA
>
>
> Spark 3 provides much more developed AQE framework, DPP and the increased
> support for GPUs and Kubernetes. The perspectives of performance increase are
> very promising.
> Currently, we try to run Kylin 4.0 with Spark 3.0 as build & query engine.
> We'd like to share the code changes here.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)