[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r213197519 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala --- @@ -47,20 +49,47 @@ object ResolveHints { * * This rule must happen before common table expressions. */ - class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] { + class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) extends Rule[LogicalPlan] { private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN") def resolver: Resolver = conf.resolver -private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = { +private def namePartsWithDatabase(nameParts: Seq[String], database: String): Seq[String] = { + if (nameParts.size == 1) { +database +: nameParts + } else { +nameParts + } +} + +private def matchedTableIdentifier( +nameParts: Seq[String], +tableIdent: IdentifierWithDatabase): Boolean = { + tableIdent.database match { +case Some(db) if resolver(catalog.globalTempViewManager.database, db) => + val identifierList = db :: tableIdent.identifier :: Nil + namePartsWithDatabase(nameParts, catalog.globalTempViewManager.database) +.corresponds(identifierList)(resolver) +case _ => + val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase) + val identifierList = db :: tableIdent.identifier :: Nil + namePartsWithDatabase(nameParts, catalog.getCurrentDatabase) --- End diff -- ok, I'll try. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r213193232 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala --- @@ -47,20 +49,47 @@ object ResolveHints { * * This rule must happen before common table expressions. */ - class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] { + class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) extends Rule[LogicalPlan] { private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN") def resolver: Resolver = conf.resolver -private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = { +private def namePartsWithDatabase(nameParts: Seq[String], database: String): Seq[String] = { + if (nameParts.size == 1) { +database +: nameParts + } else { +nameParts + } +} + +private def matchedTableIdentifier( +nameParts: Seq[String], +tableIdent: IdentifierWithDatabase): Boolean = { + tableIdent.database match { +case Some(db) if resolver(catalog.globalTempViewManager.database, db) => + val identifierList = db :: tableIdent.identifier :: Nil + namePartsWithDatabase(nameParts, catalog.globalTempViewManager.database) +.corresponds(identifierList)(resolver) +case _ => + val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase) + val identifierList = db :: tableIdent.identifier :: Nil + namePartsWithDatabase(nameParts, catalog.getCurrentDatabase) --- End diff -- This part will break `temporary view` case. In the following case, no table should be broadcasted. Also, could you add more test cases? We need to test `table`, `global temporary view`, `temporary view`, and `view`. We may miss some cases until now like the following. ```scala scala> :paste // Entering paste mode (ctrl-D to finish) sql("set spark.sql.autoBroadcastJoinThreshold=-1") spark.range(10).write.mode("overwrite").saveAsTable("t") sql("create temporary view tv as select * from t") sql("select /*+ mapjoin(default.tv) */ * from t, tv where t.id = tv.id").explain sql("select * from default.tv") // Exiting paste mode, now interpreting. == Physical Plan == *(2) BroadcastHashJoin [id#7L], [id#12L], Inner, BuildRight :- *(2) Project [id#7L] : +- *(2) Filter isnotnull(id#7L) : +- *(2) FileScan parquet default.t[id#7L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/dongjoon/PR-22198/spark-warehouse/t], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])) +- *(1) Project [id#12L] +- *(1) Filter isnotnull(id#12L) +- *(1) FileScan parquet default.t[id#12L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/dongjoon/PR-22198/spark-warehouse/t], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct org.apache.spark.sql.AnalysisException: Table or view not found: `default`.`tv`; line 1 pos 14; 'Project [*] +- 'UnresolvedRelation `default`.`tv` ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r213093040 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala --- @@ -47,20 +49,51 @@ object ResolveHints { * * This rule must happen before common table expressions. */ - class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] { + class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) extends Rule[LogicalPlan] { private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN") def resolver: Resolver = conf.resolver -private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = { +private def namePartsWithDatabase(nameParts: Seq[String], database: String): Seq[String] = { + if (nameParts.size == 1) { +database +: nameParts + } else { +nameParts + } +} + +private def formatDatabaseName(name: String): String = { + if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT) +} + +private def matchedTableIdentifier( +nameParts: Seq[String], +tableIdent: IdentifierWithDatabase): Boolean = { + tableIdent.database match { +case Some(db) if catalog.globalTempViewManager.database == formatDatabaseName(db) => --- End diff -- Also, we need a case-sensitive test. I made another PR to you for that, https://github.com/maropu/spark/pull/3 . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r213086884 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala --- @@ -47,20 +49,51 @@ object ResolveHints { * * This rule must happen before common table expressions. */ - class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] { + class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) extends Rule[LogicalPlan] { private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN") def resolver: Resolver = conf.resolver -private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = { +private def namePartsWithDatabase(nameParts: Seq[String], database: String): Seq[String] = { + if (nameParts.size == 1) { +database +: nameParts + } else { +nameParts + } +} + +private def formatDatabaseName(name: String): String = { + if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT) +} + +private def matchedTableIdentifier( +nameParts: Seq[String], +tableIdent: IdentifierWithDatabase): Boolean = { + tableIdent.database match { +case Some(db) if catalog.globalTempViewManager.database == formatDatabaseName(db) => --- End diff -- Could you use `resolver` here like the following and remove `formatDatabaseName` in line 65~67? Since it's a `SessionCatalog` function, let's avoid duplication. ```scala - case Some(db) if catalog.globalTempViewManager.database == formatDatabaseName(db) => + case Some(db) if resolver(catalog.globalTempViewManager.database, db) => ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r213061786 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala --- @@ -47,20 +49,39 @@ object ResolveHints { * * This rule must happen before common table expressions. */ - class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] { + class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) extends Rule[LogicalPlan] { --- End diff -- Ya. Right, please ignore this. We need `catalog` to lookup `global_temp`, too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r212861566 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala --- @@ -47,20 +49,39 @@ object ResolveHints { * * This rule must happen before common table expressions. */ - class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] { + class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) extends Rule[LogicalPlan] { private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN") def resolver: Resolver = conf.resolver -private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = { +private def namePartsWithDatabase(nameParts: Seq[String]): Seq[String] = { + if (nameParts.size == 1) { +catalog.getCurrentDatabase +: nameParts + } else { +nameParts + } +} + +private def matchedTableIdentifier( +nameParts: Seq[String], +tableIdent: IdentifierWithDatabase): Boolean = { + val identifierList = +tableIdent.database.getOrElse(catalog.getCurrentDatabase) :: tableIdent.identifier :: Nil + namePartsWithDatabase(nameParts).corresponds(identifierList)(resolver) --- End diff -- oh, yes. I'll refine the pr. thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r212861282 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala --- @@ -47,20 +49,39 @@ object ResolveHints { * * This rule must happen before common table expressions. */ - class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] { + class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) extends Rule[LogicalPlan] { private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN") def resolver: Resolver = conf.resolver -private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = { +private def namePartsWithDatabase(nameParts: Seq[String]): Seq[String] = { + if (nameParts.size == 1) { +catalog.getCurrentDatabase +: nameParts + } else { +nameParts + } +} + +private def matchedTableIdentifier( +nameParts: Seq[String], +tableIdent: IdentifierWithDatabase): Boolean = { + val identifierList = +tableIdent.database.getOrElse(catalog.getCurrentDatabase) :: tableIdent.identifier :: Nil + namePartsWithDatabase(nameParts).corresponds(identifierList)(resolver) --- End diff -- BTW, @maropu . In addition, - The current behavior of `master` branch (Spark 2.4) is `Apply a hint into both`. - The legacy behavior of Spark 2.3.1 is raising an exception for that query. So, I think it's a good change to become consistent in Spark 2.4. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r212852643 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala --- @@ -47,20 +49,39 @@ object ResolveHints { * * This rule must happen before common table expressions. */ - class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] { + class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) extends Rule[LogicalPlan] { private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN") def resolver: Resolver = conf.resolver -private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = { +private def namePartsWithDatabase(nameParts: Seq[String]): Seq[String] = { + if (nameParts.size == 1) { +catalog.getCurrentDatabase +: nameParts + } else { +nameParts + } +} + +private def matchedTableIdentifier( +nameParts: Seq[String], +tableIdent: IdentifierWithDatabase): Boolean = { + val identifierList = +tableIdent.database.getOrElse(catalog.getCurrentDatabase) :: tableIdent.identifier :: Nil + namePartsWithDatabase(nameParts).corresponds(identifierList)(resolver) --- End diff -- 1) First of all, the above two test cases should work as before. `global_temp.v1` should be used with the prefix `global_temp.`. Before this PR, we cannot put `database` name on Hint. So, we allowed exceptional cases; hints on global temporary view (without `global_temp.` prefix). 2) For the case you mentioned, I'd like to interpret `MAPJOIN(v1)` to `default.v1 only` because it's the Spark's behavior outside this Hint syntax. And, please add a test case for this, too. @cloud-fan and @gatorsmile . Could you give us some advice, too? Is it okay to you? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r212851693 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala --- @@ -47,20 +49,39 @@ object ResolveHints { * * This rule must happen before common table expressions. */ - class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] { + class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) extends Rule[LogicalPlan] { --- End diff -- We can instead use `getCurrentDatabase: () => String`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r212850203 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala --- @@ -47,20 +49,39 @@ object ResolveHints { * * This rule must happen before common table expressions. */ - class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] { + class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) extends Rule[LogicalPlan] { private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN") def resolver: Resolver = conf.resolver -private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = { +private def namePartsWithDatabase(nameParts: Seq[String]): Seq[String] = { + if (nameParts.size == 1) { +catalog.getCurrentDatabase +: nameParts + } else { +nameParts + } +} + +private def matchedTableIdentifier( +nameParts: Seq[String], +tableIdent: IdentifierWithDatabase): Boolean = { + val identifierList = +tableIdent.database.getOrElse(catalog.getCurrentDatabase) :: tableIdent.identifier :: Nil + namePartsWithDatabase(nameParts).corresponds(identifierList)(resolver) --- End diff -- @dongjoon-hyun a little confused about the name resolution here; ``` "SELECT /*+ MAPJOIN(v1) */ * FROM global_temp.v1, v2 WHERE v1.id = v2.id", ``` `MAPJOIN(v1)` implicitly means `global_temp.v1`? For example; ``` "SELECT /*+ MAPJOIN(v1) */ * FROM default.v1, global_temp.v1 WHERE default.v1.id = global_temp.v1.id", ``` In this case, what's the `MAPJOIN(v1)` behaviour? - 1. Apply no hint - 2. Apply a hint into `default.v1` only - 3. Apply a hint into both WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r212844578 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala --- @@ -47,20 +49,39 @@ object ResolveHints { * * This rule must happen before common table expressions. */ - class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] { + class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) extends Rule[LogicalPlan] { --- End diff -- I think we can't use `String` there because `currentDatabase` can be updatable by others? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r212844390 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -144,7 +144,7 @@ class Analyzer( lazy val batches: Seq[Batch] = Seq( Batch("Hints", fixedPoint, - new ResolveHints.ResolveBroadcastHints(conf), + new ResolveHints.ResolveBroadcastHints(conf, catalog), --- End diff -- aha, ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r212843948 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala --- @@ -191,6 +195,48 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size == 1) } + test("SPARK-25121 Supports multi-part names for broadcast hint resolution") { +val (table1Name, table2Name) = ("t1", "t2") +withTempDatabase { dbName => + withTable(table1Name, table2Name) { +withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { + spark.range(50).write.saveAsTable(s"$dbName.$table1Name") + spark.range(100).write.saveAsTable(s"$dbName.$table2Name") + // First, makes sure a join is not broadcastable + val plan = sql(s"SELECT * FROM $dbName.$table1Name, $dbName.$table2Name " + + s"WHERE $table1Name.id = $table2Name.id") +.queryExecution.executedPlan + assert(plan.collect { case p: BroadcastHashJoinExec => p }.size == 0) + + // Uses multi-part table names for broadcast hints + def checkIfHintApplied(tableName: String, hintTableName: String): Unit = { --- End diff -- yea, I'll fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r212822215 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala --- @@ -191,6 +195,48 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size == 1) } + test("SPARK-25121 Supports multi-part names for broadcast hint resolution") { +val (table1Name, table2Name) = ("t1", "t2") +withTempDatabase { dbName => + withTable(table1Name, table2Name) { +withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { + spark.range(50).write.saveAsTable(s"$dbName.$table1Name") + spark.range(100).write.saveAsTable(s"$dbName.$table2Name") + // First, makes sure a join is not broadcastable + val plan = sql(s"SELECT * FROM $dbName.$table1Name, $dbName.$table2Name " + + s"WHERE $table1Name.id = $table2Name.id") +.queryExecution.executedPlan + assert(plan.collect { case p: BroadcastHashJoinExec => p }.size == 0) + + // Uses multi-part table names for broadcast hints + def checkIfHintApplied(tableName: String, hintTableName: String): Unit = { --- End diff -- `hintTableName` is never used in this func? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r212811422 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala --- @@ -47,20 +49,39 @@ object ResolveHints { * * This rule must happen before common table expressions. */ - class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] { + class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) extends Rule[LogicalPlan] { private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN") def resolver: Resolver = conf.resolver -private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = { +private def namePartsWithDatabase(nameParts: Seq[String]): Seq[String] = { + if (nameParts.size == 1) { +catalog.getCurrentDatabase +: nameParts + } else { +nameParts + } +} + +private def matchedTableIdentifier( +nameParts: Seq[String], +tableIdent: IdentifierWithDatabase): Boolean = { + val identifierList = +tableIdent.database.getOrElse(catalog.getCurrentDatabase) :: tableIdent.identifier :: Nil + namePartsWithDatabase(nameParts).corresponds(identifierList)(resolver) --- End diff -- This logic will make a regression (`plan1` in the below) in case of global temporary view. Please add the following test case into `GlobalTempViewSuite` and revise the logic to handle both cases correctly. ```scala test("broadcast hint on global temp view") { import org.apache.spark.sql.catalyst.plans.logical.{ResolvedHint, Join} spark.range(10).createGlobalTempView("v1") spark.range(10).createOrReplaceTempView("v2") val plan1 = sql(s"SELECT /*+ BROADCAST(v1) */ * FROM global_temp.v1, v2 WHERE v1.id = v2.id") .queryExecution.optimizedPlan assert(plan1.asInstanceOf[Join].left.isInstanceOf[ResolvedHint]) assert(!plan1.asInstanceOf[Join].right.isInstanceOf[ResolvedHint]) val plan2 = sql( s"SELECT /*+ BROADCAST(global_temp.v1) */ * FROM global_temp.v1, v2 WHERE v1.id = v2.id") .queryExecution.optimizedPlan assert(plan2.asInstanceOf[Join].left.isInstanceOf[ResolvedHint]) assert(!plan2.asInstanceOf[Join].right.isInstanceOf[ResolvedHint]) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r212807892 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala --- @@ -47,20 +49,39 @@ object ResolveHints { * * This rule must happen before common table expressions. */ - class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] { + class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) extends Rule[LogicalPlan] { --- End diff -- Accordingly, we can use String instead of SessionCatalog. ```scala - class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) extends Rule[LogicalPlan] { + class ResolveBroadcastHints(conf: SQLConf, currentDatabase: String) extends Rule[LogicalPlan] { ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r212807880 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -144,7 +144,7 @@ class Analyzer( lazy val batches: Seq[Batch] = Seq( Batch("Hints", fixedPoint, - new ResolveHints.ResolveBroadcastHints(conf), + new ResolveHints.ResolveBroadcastHints(conf, catalog), --- End diff -- `catalog` is too big for this since we only use the current database name. ```scala - new ResolveHints.ResolveBroadcastHints(conf, catalog), + new ResolveHints.ResolveBroadcastHints(conf, catalog.getCurrentDatabase), ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r212807800 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala --- @@ -191,6 +195,39 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size == 1) } + test("SPARK-25121 Supports multi-part names for broadcast hint resolution") { --- End diff -- `ResolveHintsSuite` is the smallest one for this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r212249623 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala --- @@ -191,6 +195,39 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size == 1) } + test("SPARK-25121 Supports multi-part names for broadcast hint resolution") { --- End diff -- Would it be better to move the three tests below into `DataFrameHintSuite`? - test("broadcast join hint using broadcast function") - test("broadcast join hint using Dataset.hint") - test("SPARK-25121 Supports multi-part names for broadcast hint resolution") --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
GitHub user maropu opened a pull request: https://github.com/apache/spark/pull/22198 [SPARK-25121][SQL] Supports multi-part table names for broadcast hint resolution ## What changes were proposed in this pull request? This pr fixed code to respect a database name for broadcast table hint resolution. Currently, spark ignores a database name in multi-part names; ``` scala> sql("CREATE DATABASE testDb") scala> spark.range(10).write.saveAsTable("testDb.t") // without this patch scala> spark.range(10).join(spark.table("testDb.t"), "id").hint("broadcast", "testDb.t").explain == Physical Plan == *(2) Project [id#24L] +- *(2) BroadcastHashJoin [id#24L], [id#26L], Inner, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) : +- *(1) Range (0, 10, step=1, splits=4) +- *(2) Project [id#26L] +- *(2) Filter isnotnull(id#26L) +- *(2) FileScan parquet testdb.t[id#26L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-2.3.1-bin-hadoop2.7/spark-warehouse..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct // with this patch scala> spark.range(10).join(spark.table("testDb.t"), "id").hint("broadcast", "testDb.t").explain == Physical Plan == *(2) Project [id#3L] +- *(2) BroadcastHashJoin [id#3L], [id#5L], Inner, BuildRight :- *(2) Range (0, 10, step=1, splits=4) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])) +- *(1) Project [id#5L] +- *(1) Filter isnotnull(id#5L) +- *(1) FileScan parquet testdb.t[id#5L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/testdb.db/t], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct ``` ## How was this patch tested? Added tests in `DataFrameJoinSuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/maropu/spark SPARK-25121 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22198.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22198 commit d2be6920ba1cc052e9d5d8364cf48375cea8ba44 Author: Takeshi Yamamuro Date: 2018-08-23T07:20:51Z Fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org