[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-27 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22198#discussion_r213197519
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 ---
@@ -47,20 +49,47 @@ object ResolveHints {
*
* This rule must happen before common table expressions.
*/
-  class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] {
+  class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) 
extends Rule[LogicalPlan] {
 private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", 
"MAPJOIN")
 
 def resolver: Resolver = conf.resolver
 
-private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: 
Set[String]): LogicalPlan = {
+private def namePartsWithDatabase(nameParts: Seq[String], database: 
String): Seq[String] = {
+  if (nameParts.size == 1) {
+database +: nameParts
+  } else {
+nameParts
+  }
+}
+
+private def matchedTableIdentifier(
+nameParts: Seq[String],
+tableIdent: IdentifierWithDatabase): Boolean = {
+  tableIdent.database match {
+case Some(db) if resolver(catalog.globalTempViewManager.database, 
db) =>
+  val identifierList = db :: tableIdent.identifier :: Nil
+  namePartsWithDatabase(nameParts, 
catalog.globalTempViewManager.database)
+.corresponds(identifierList)(resolver)
+case _ =>
+  val db = 
tableIdent.database.getOrElse(catalog.getCurrentDatabase)
+  val identifierList = db :: tableIdent.identifier :: Nil
+  namePartsWithDatabase(nameParts, catalog.getCurrentDatabase)
--- End diff --

ok, I'll try.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/22198#discussion_r213193232
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 ---
@@ -47,20 +49,47 @@ object ResolveHints {
*
* This rule must happen before common table expressions.
*/
-  class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] {
+  class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) 
extends Rule[LogicalPlan] {
 private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", 
"MAPJOIN")
 
 def resolver: Resolver = conf.resolver
 
-private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: 
Set[String]): LogicalPlan = {
+private def namePartsWithDatabase(nameParts: Seq[String], database: 
String): Seq[String] = {
+  if (nameParts.size == 1) {
+database +: nameParts
+  } else {
+nameParts
+  }
+}
+
+private def matchedTableIdentifier(
+nameParts: Seq[String],
+tableIdent: IdentifierWithDatabase): Boolean = {
+  tableIdent.database match {
+case Some(db) if resolver(catalog.globalTempViewManager.database, 
db) =>
+  val identifierList = db :: tableIdent.identifier :: Nil
+  namePartsWithDatabase(nameParts, 
catalog.globalTempViewManager.database)
+.corresponds(identifierList)(resolver)
+case _ =>
+  val db = 
tableIdent.database.getOrElse(catalog.getCurrentDatabase)
+  val identifierList = db :: tableIdent.identifier :: Nil
+  namePartsWithDatabase(nameParts, catalog.getCurrentDatabase)
--- End diff --

This part will break `temporary view` case. In the following case, no table 
should be broadcasted. Also, could you add more test cases? We need to test 
`table`, `global temporary view`, `temporary view`, and `view`. We may miss 
some cases until now like the following.

```scala
scala> :paste
// Entering paste mode (ctrl-D to finish)

sql("set spark.sql.autoBroadcastJoinThreshold=-1")
spark.range(10).write.mode("overwrite").saveAsTable("t")
sql("create temporary view tv as select * from t")
sql("select /*+ mapjoin(default.tv) */ * from t, tv where t.id = 
tv.id").explain
sql("select * from default.tv")

// Exiting paste mode, now interpreting.
== Physical Plan ==
*(2) BroadcastHashJoin [id#7L], [id#12L], Inner, BuildRight
:- *(2) Project [id#7L]
:  +- *(2) Filter isnotnull(id#7L)
: +- *(2) FileScan parquet default.t[id#7L] Batched: true, Format: 
Parquet, Location: 
InMemoryFileIndex[file:/Users/dongjoon/PR-22198/spark-warehouse/t], 
PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: 
struct
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
true]))
   +- *(1) Project [id#12L]
  +- *(1) Filter isnotnull(id#12L)
 +- *(1) FileScan parquet default.t[id#12L] Batched: true, Format: 
Parquet, Location: 
InMemoryFileIndex[file:/Users/dongjoon/PR-22198/spark-warehouse/t], 
PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: 
struct
org.apache.spark.sql.AnalysisException: Table or view not found: 
`default`.`tv`; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation `default`.`tv`
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/22198#discussion_r213093040
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 ---
@@ -47,20 +49,51 @@ object ResolveHints {
*
* This rule must happen before common table expressions.
*/
-  class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] {
+  class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) 
extends Rule[LogicalPlan] {
 private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", 
"MAPJOIN")
 
 def resolver: Resolver = conf.resolver
 
-private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: 
Set[String]): LogicalPlan = {
+private def namePartsWithDatabase(nameParts: Seq[String], database: 
String): Seq[String] = {
+  if (nameParts.size == 1) {
+database +: nameParts
+  } else {
+nameParts
+  }
+}
+
+private def formatDatabaseName(name: String): String = {
+  if (conf.caseSensitiveAnalysis) name else 
name.toLowerCase(Locale.ROOT)
+}
+
+private def matchedTableIdentifier(
+nameParts: Seq[String],
+tableIdent: IdentifierWithDatabase): Boolean = {
+  tableIdent.database match {
+case Some(db) if catalog.globalTempViewManager.database == 
formatDatabaseName(db) =>
--- End diff --

Also, we need a case-sensitive test. I made another PR to you for that, 
https://github.com/maropu/spark/pull/3 .


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/22198#discussion_r213086884
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 ---
@@ -47,20 +49,51 @@ object ResolveHints {
*
* This rule must happen before common table expressions.
*/
-  class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] {
+  class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) 
extends Rule[LogicalPlan] {
 private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", 
"MAPJOIN")
 
 def resolver: Resolver = conf.resolver
 
-private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: 
Set[String]): LogicalPlan = {
+private def namePartsWithDatabase(nameParts: Seq[String], database: 
String): Seq[String] = {
+  if (nameParts.size == 1) {
+database +: nameParts
+  } else {
+nameParts
+  }
+}
+
+private def formatDatabaseName(name: String): String = {
+  if (conf.caseSensitiveAnalysis) name else 
name.toLowerCase(Locale.ROOT)
+}
+
+private def matchedTableIdentifier(
+nameParts: Seq[String],
+tableIdent: IdentifierWithDatabase): Boolean = {
+  tableIdent.database match {
+case Some(db) if catalog.globalTempViewManager.database == 
formatDatabaseName(db) =>
--- End diff --

Could you use `resolver` here like the following and remove 
`formatDatabaseName` in line 65~67? Since it's a `SessionCatalog` function, 
let's avoid duplication. 
```scala
- case Some(db) if catalog.globalTempViewManager.database == 
formatDatabaseName(db) =>
+ case Some(db) if resolver(catalog.globalTempViewManager.database, db) =>
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/22198#discussion_r213061786
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 ---
@@ -47,20 +49,39 @@ object ResolveHints {
*
* This rule must happen before common table expressions.
*/
-  class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] {
+  class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) 
extends Rule[LogicalPlan] {
--- End diff --

Ya. Right, please ignore this. We need `catalog` to lookup `global_temp`, 
too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-26 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22198#discussion_r212861566
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 ---
@@ -47,20 +49,39 @@ object ResolveHints {
*
* This rule must happen before common table expressions.
*/
-  class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] {
+  class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) 
extends Rule[LogicalPlan] {
 private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", 
"MAPJOIN")
 
 def resolver: Resolver = conf.resolver
 
-private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: 
Set[String]): LogicalPlan = {
+private def namePartsWithDatabase(nameParts: Seq[String]): Seq[String] 
= {
+  if (nameParts.size == 1) {
+catalog.getCurrentDatabase +: nameParts
+  } else {
+nameParts
+  }
+}
+
+private def matchedTableIdentifier(
+nameParts: Seq[String],
+tableIdent: IdentifierWithDatabase): Boolean = {
+  val identifierList =
+tableIdent.database.getOrElse(catalog.getCurrentDatabase) :: 
tableIdent.identifier :: Nil
+  
namePartsWithDatabase(nameParts).corresponds(identifierList)(resolver)
--- End diff --

oh, yes. I'll refine the pr. thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-26 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/22198#discussion_r212861282
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 ---
@@ -47,20 +49,39 @@ object ResolveHints {
*
* This rule must happen before common table expressions.
*/
-  class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] {
+  class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) 
extends Rule[LogicalPlan] {
 private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", 
"MAPJOIN")
 
 def resolver: Resolver = conf.resolver
 
-private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: 
Set[String]): LogicalPlan = {
+private def namePartsWithDatabase(nameParts: Seq[String]): Seq[String] 
= {
+  if (nameParts.size == 1) {
+catalog.getCurrentDatabase +: nameParts
+  } else {
+nameParts
+  }
+}
+
+private def matchedTableIdentifier(
+nameParts: Seq[String],
+tableIdent: IdentifierWithDatabase): Boolean = {
+  val identifierList =
+tableIdent.database.getOrElse(catalog.getCurrentDatabase) :: 
tableIdent.identifier :: Nil
+  
namePartsWithDatabase(nameParts).corresponds(identifierList)(resolver)
--- End diff --

BTW, @maropu . In addition,
- The current behavior of `master` branch (Spark 2.4) is `Apply a hint into 
both`.
- The legacy behavior of Spark 2.3.1 is raising an exception for that query.

So, I think it's a good change to become consistent in Spark 2.4.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-26 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/22198#discussion_r212852643
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 ---
@@ -47,20 +49,39 @@ object ResolveHints {
*
* This rule must happen before common table expressions.
*/
-  class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] {
+  class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) 
extends Rule[LogicalPlan] {
 private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", 
"MAPJOIN")
 
 def resolver: Resolver = conf.resolver
 
-private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: 
Set[String]): LogicalPlan = {
+private def namePartsWithDatabase(nameParts: Seq[String]): Seq[String] 
= {
+  if (nameParts.size == 1) {
+catalog.getCurrentDatabase +: nameParts
+  } else {
+nameParts
+  }
+}
+
+private def matchedTableIdentifier(
+nameParts: Seq[String],
+tableIdent: IdentifierWithDatabase): Boolean = {
+  val identifierList =
+tableIdent.database.getOrElse(catalog.getCurrentDatabase) :: 
tableIdent.identifier :: Nil
+  
namePartsWithDatabase(nameParts).corresponds(identifierList)(resolver)
--- End diff --

1) First of all, the above two test cases should work as before. 
`global_temp.v1` should be used with the prefix `global_temp.`. Before this PR, 
we cannot put `database` name on Hint. So, we allowed exceptional cases; hints 
on global temporary view (without `global_temp.` prefix).

2) For the case you mentioned, I'd like to interpret `MAPJOIN(v1)` to 
`default.v1 only` because it's the Spark's behavior outside this Hint syntax. 
And, please add a test case for this, too.

@cloud-fan and @gatorsmile . Could you give us some advice, too? Is it okay 
to you?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-26 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22198#discussion_r212851693
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 ---
@@ -47,20 +49,39 @@ object ResolveHints {
*
* This rule must happen before common table expressions.
*/
-  class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] {
+  class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) 
extends Rule[LogicalPlan] {
--- End diff --

We can instead use `getCurrentDatabase: () => String`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-26 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22198#discussion_r212850203
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 ---
@@ -47,20 +49,39 @@ object ResolveHints {
*
* This rule must happen before common table expressions.
*/
-  class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] {
+  class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) 
extends Rule[LogicalPlan] {
 private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", 
"MAPJOIN")
 
 def resolver: Resolver = conf.resolver
 
-private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: 
Set[String]): LogicalPlan = {
+private def namePartsWithDatabase(nameParts: Seq[String]): Seq[String] 
= {
+  if (nameParts.size == 1) {
+catalog.getCurrentDatabase +: nameParts
+  } else {
+nameParts
+  }
+}
+
+private def matchedTableIdentifier(
+nameParts: Seq[String],
+tableIdent: IdentifierWithDatabase): Boolean = {
+  val identifierList =
+tableIdent.database.getOrElse(catalog.getCurrentDatabase) :: 
tableIdent.identifier :: Nil
+  
namePartsWithDatabase(nameParts).corresponds(identifierList)(resolver)
--- End diff --

@dongjoon-hyun a little confused about the name resolution here;
```
"SELECT /*+ MAPJOIN(v1) */ * FROM global_temp.v1, v2 WHERE v1.id = v2.id",
```
`MAPJOIN(v1)` implicitly means `global_temp.v1`?
For example;
```
"SELECT /*+ MAPJOIN(v1) */ * FROM default.v1, global_temp.v1 WHERE 
default.v1.id = global_temp.v1.id",
```
In this case, what's the `MAPJOIN(v1)` behaviour?
 - 1. Apply no hint
 - 2. Apply a hint into `default.v1` only
 - 3. Apply a hint into both

WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-26 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22198#discussion_r212844578
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 ---
@@ -47,20 +49,39 @@ object ResolveHints {
*
* This rule must happen before common table expressions.
*/
-  class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] {
+  class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) 
extends Rule[LogicalPlan] {
--- End diff --

I think we can't use `String` there because `currentDatabase` can be 
updatable by others?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-26 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22198#discussion_r212844390
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -144,7 +144,7 @@ class Analyzer(
 
   lazy val batches: Seq[Batch] = Seq(
 Batch("Hints", fixedPoint,
-  new ResolveHints.ResolveBroadcastHints(conf),
+  new ResolveHints.ResolveBroadcastHints(conf, catalog),
--- End diff --

aha, ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-26 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22198#discussion_r212843948
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala ---
@@ -191,6 +195,48 @@ class DataFrameJoinSuite extends QueryTest with 
SharedSQLContext {
 assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size == 1)
   }
 
+  test("SPARK-25121 Supports multi-part names for broadcast hint 
resolution") {
+val (table1Name, table2Name) = ("t1", "t2")
+withTempDatabase { dbName =>
+  withTable(table1Name, table2Name) {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
+  spark.range(50).write.saveAsTable(s"$dbName.$table1Name")
+  spark.range(100).write.saveAsTable(s"$dbName.$table2Name")
+  // First, makes sure a join is not broadcastable
+  val plan = sql(s"SELECT * FROM $dbName.$table1Name, 
$dbName.$table2Name " +
+  s"WHERE $table1Name.id = $table2Name.id")
+.queryExecution.executedPlan
+  assert(plan.collect { case p: BroadcastHashJoinExec => p }.size 
== 0)
+
+  // Uses multi-part table names for broadcast hints
+  def checkIfHintApplied(tableName: String, hintTableName: 
String): Unit = {
--- End diff --

yea, I'll fix.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22198#discussion_r212822215
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala ---
@@ -191,6 +195,48 @@ class DataFrameJoinSuite extends QueryTest with 
SharedSQLContext {
 assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size == 1)
   }
 
+  test("SPARK-25121 Supports multi-part names for broadcast hint 
resolution") {
+val (table1Name, table2Name) = ("t1", "t2")
+withTempDatabase { dbName =>
+  withTable(table1Name, table2Name) {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
+  spark.range(50).write.saveAsTable(s"$dbName.$table1Name")
+  spark.range(100).write.saveAsTable(s"$dbName.$table2Name")
+  // First, makes sure a join is not broadcastable
+  val plan = sql(s"SELECT * FROM $dbName.$table1Name, 
$dbName.$table2Name " +
+  s"WHERE $table1Name.id = $table2Name.id")
+.queryExecution.executedPlan
+  assert(plan.collect { case p: BroadcastHashJoinExec => p }.size 
== 0)
+
+  // Uses multi-part table names for broadcast hints
+  def checkIfHintApplied(tableName: String, hintTableName: 
String): Unit = {
--- End diff --

`hintTableName` is never used in this func?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-25 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/22198#discussion_r212811422
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 ---
@@ -47,20 +49,39 @@ object ResolveHints {
*
* This rule must happen before common table expressions.
*/
-  class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] {
+  class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) 
extends Rule[LogicalPlan] {
 private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", 
"MAPJOIN")
 
 def resolver: Resolver = conf.resolver
 
-private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: 
Set[String]): LogicalPlan = {
+private def namePartsWithDatabase(nameParts: Seq[String]): Seq[String] 
= {
+  if (nameParts.size == 1) {
+catalog.getCurrentDatabase +: nameParts
+  } else {
+nameParts
+  }
+}
+
+private def matchedTableIdentifier(
+nameParts: Seq[String],
+tableIdent: IdentifierWithDatabase): Boolean = {
+  val identifierList =
+tableIdent.database.getOrElse(catalog.getCurrentDatabase) :: 
tableIdent.identifier :: Nil
+  
namePartsWithDatabase(nameParts).corresponds(identifierList)(resolver)
--- End diff --

This logic will make a regression (`plan1` in the below) in case of global 
temporary view. Please add the following test case into `GlobalTempViewSuite` 
and revise the logic to handle both cases correctly.

```scala
test("broadcast hint on global temp view") {
import org.apache.spark.sql.catalyst.plans.logical.{ResolvedHint, Join}

spark.range(10).createGlobalTempView("v1")
spark.range(10).createOrReplaceTempView("v2")

val plan1 = sql(s"SELECT /*+ BROADCAST(v1) */ * FROM global_temp.v1, v2 
WHERE v1.id = v2.id")
  .queryExecution.optimizedPlan
assert(plan1.asInstanceOf[Join].left.isInstanceOf[ResolvedHint])
assert(!plan1.asInstanceOf[Join].right.isInstanceOf[ResolvedHint])

val plan2 = sql(
  s"SELECT /*+ BROADCAST(global_temp.v1) */ * FROM global_temp.v1, v2 
WHERE v1.id = v2.id")
  .queryExecution.optimizedPlan
assert(plan2.asInstanceOf[Join].left.isInstanceOf[ResolvedHint])
assert(!plan2.asInstanceOf[Join].right.isInstanceOf[ResolvedHint])
  }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-25 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/22198#discussion_r212807892
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 ---
@@ -47,20 +49,39 @@ object ResolveHints {
*
* This rule must happen before common table expressions.
*/
-  class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] {
+  class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) 
extends Rule[LogicalPlan] {
--- End diff --

Accordingly, we can use String instead of SessionCatalog.
```scala
-  class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) 
extends Rule[LogicalPlan] {
+  class ResolveBroadcastHints(conf: SQLConf, currentDatabase: String) 
extends Rule[LogicalPlan] {
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-25 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/22198#discussion_r212807880
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -144,7 +144,7 @@ class Analyzer(
 
   lazy val batches: Seq[Batch] = Seq(
 Batch("Hints", fixedPoint,
-  new ResolveHints.ResolveBroadcastHints(conf),
+  new ResolveHints.ResolveBroadcastHints(conf, catalog),
--- End diff --

`catalog` is too big for this since we only use the current database name.
```scala
-  new ResolveHints.ResolveBroadcastHints(conf, catalog),
+  new ResolveHints.ResolveBroadcastHints(conf, 
catalog.getCurrentDatabase),
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-25 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/22198#discussion_r212807800
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala ---
@@ -191,6 +195,39 @@ class DataFrameJoinSuite extends QueryTest with 
SharedSQLContext {
 assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size == 1)
   }
 
+  test("SPARK-25121 Supports multi-part names for broadcast hint 
resolution") {
--- End diff --

`ResolveHintsSuite` is the smallest one for this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-23 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22198#discussion_r212249623
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala ---
@@ -191,6 +195,39 @@ class DataFrameJoinSuite extends QueryTest with 
SharedSQLContext {
 assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size == 1)
   }
 
+  test("SPARK-25121 Supports multi-part names for broadcast hint 
resolution") {
--- End diff --

Would it be better to move the three tests below into `DataFrameHintSuite`?
- test("broadcast join hint using broadcast function")
- test("broadcast join hint using Dataset.hint") 
- test("SPARK-25121 Supports multi-part names for broadcast hint 
resolution")


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-23 Thread maropu
GitHub user maropu opened a pull request:

https://github.com/apache/spark/pull/22198

[SPARK-25121][SQL] Supports multi-part table names for broadcast hint 
resolution

## What changes were proposed in this pull request?
This pr fixed code to respect a database name for broadcast table hint 
resolution.
Currently, spark ignores a database name in multi-part names;
```
scala> sql("CREATE DATABASE testDb")
scala> spark.range(10).write.saveAsTable("testDb.t")

// without this patch
scala> spark.range(10).join(spark.table("testDb.t"), 
"id").hint("broadcast", "testDb.t").explain
== Physical Plan ==
*(2) Project [id#24L]
+- *(2) BroadcastHashJoin [id#24L], [id#26L], Inner, BuildLeft
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
false]))
   :  +- *(1) Range (0, 10, step=1, splits=4)
   +- *(2) Project [id#26L]
  +- *(2) Filter isnotnull(id#26L)
 +- *(2) FileScan parquet testdb.t[id#26L] Batched: true, Format: 
Parquet, Location: 
InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-2.3.1-bin-hadoop2.7/spark-warehouse...,
 PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: 
struct

// with this patch
scala> spark.range(10).join(spark.table("testDb.t"), 
"id").hint("broadcast", "testDb.t").explain
== Physical Plan ==
*(2) Project [id#3L]
+- *(2) BroadcastHashJoin [id#3L], [id#5L], Inner, BuildRight
   :- *(2) Range (0, 10, step=1, splits=4)
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
true]))
  +- *(1) Project [id#5L]
 +- *(1) Filter isnotnull(id#5L)
+- *(1) FileScan parquet testdb.t[id#5L] Batched: true, Format: 
Parquet, Location: 
InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/testdb.db/t],
 PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: 
struct
```

## How was this patch tested?
Added tests in `DataFrameJoinSuite`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/maropu/spark SPARK-25121

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22198.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22198


commit d2be6920ba1cc052e9d5d8364cf48375cea8ba44
Author: Takeshi Yamamuro 
Date:   2018-08-23T07:20:51Z

Fix




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org