[jira] [Created] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

2017-10-05 Thread Benyi Wang (JIRA)
Benyi Wang created SPARK-22211:
--

 Summary: LimitPushDown optimization for FullOuterJoin generates 
wrong results
 Key: SPARK-22211
 URL: https://issues.apache.org/jira/browse/SPARK-22211
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.0
 Environment: on community.cloude.databrick.com 
Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
Reporter: Benyi Wang


LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
generate a wrong result:

Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
is selected, but at right side we have 100K rows including 999, the result will 
be
- one row is (999, 999)
- the rest rows are (null, xxx)

Once you call show(), the row (999,999) has only 1/10th chance to be 
selected by CollectLimit.

The actual optimization might be, 
- push down limit
- but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.

Here is my notebook:
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html
{code:java}
import scala.util.Random._

val dl = shuffle(1 to 10).toDF("id")
val dr = shuffle(1 to 10).toDF("id")

println("data frame dl:")
dl.explain

println("data frame dr:")
dr.explain

val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)

j.explain

j.show(false)
{code}

{code}
data frame dl:
== Physical Plan ==
LocalTableScan [id#10]
data frame dr:
== Physical Plan ==
LocalTableScan [id#16]
== Physical Plan ==
CollectLimit 1
+- SortMergeJoin [id#10], [id#16], FullOuter
   :- *Sort [id#10 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#10, 200)
   : +- *LocalLimit 1
   :+- LocalTableScan [id#10]
   +- *Sort [id#16 ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(id#16, 200)
 +- LocalTableScan [id#16]
import scala.util.Random._
dl: org.apache.spark.sql.DataFrame = [id: int]
dr: org.apache.spark.sql.DataFrame = [id: int]
j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]

++---+
|id  |id |
++---+
|null|148|
++---+
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

2017-10-12 Thread Benyi Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16202278#comment-16202278
 ] 

Benyi Wang commented on SPARK-22211:


I think my suggestion solution is correct.

|| Case || Left join key || Right join key || Full outer join ||
| 1 | Y | N | {{(left(\*), null)}} |
| 2 | Y | Y | {{(left(\*), right(\*))}} |
| 3 | N | Y | {{(null, right(\*))}} |
| 4 | N | N | Not applied |

If LimitPushDown pushes limit to the left side, whatever a limit value is and 
how big of left side table, you will always select some rows, in other words, 
the join keys are always exists, and only case 1 and 2 will happen, so it is 
actually a Left-join instead. It is equivalent to right-join when pushing down 
to  the right side.

The only problem of this method is: case 3 has no chance to be shown while 
pushing down to the left side, and case 1 for the right side. I would say this 
is not a big issue because we just want to see some samples of the join result, 
but the benefit is huge. If we want to see left-only or right-only, we might 
add where clause.  

> LimitPushDown optimization for FullOuterJoin generates wrong results
> 
>
> Key: SPARK-22211
> URL: https://issues.apache.org/jira/browse/SPARK-22211
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>Reporter: Benyi Wang
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/10th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 10).toDF("id")
> val dr = shuffle(1 to 10).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>:- *Sort [id#10 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#10, 200)
>: +- *LocalLimit 1
>:+- LocalTableScan [id#10]
>+- *Sort [id#16 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(id#16, 200)
>  +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> ++---+
> |id  |id |
> ++---+
> |null|148|
> ++---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-6589) SQLUserDefinedType failed in spark-shell

2015-03-28 Thread Benyi Wang (JIRA)
Benyi Wang created SPARK-6589:
-

 Summary: SQLUserDefinedType failed in spark-shell
 Key: SPARK-6589
 URL: https://issues.apache.org/jira/browse/SPARK-6589
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.0
 Environment: CDH 5.3.2
Reporter: Benyi Wang


{{DataType.fromJson}} will fail in spark-shell if the schema includes "udt". It 
works if running in an application. 

This causes that I cannot read a parquet file including a UDT field. 
{{DataType.fromCaseClass}} does not support UDT.

I can load the class which shows that my UDT is in the classpath.
{code}
scala> Class.forName("com.bwang.MyTestUDT")
res6: Class[_] = class com.bwang.MyTestUDT
{code}

But DataType fails:
{code}
scala> DataType.fromJson(json)  

java.lang.ClassNotFoundException: com.bwang.MyTestUDT
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:190)
at 
org.apache.spark.sql.catalyst.types.DataType$.parseDataType(dataTypes.scala:77)
{code}

The reason is DataType.fromJson tries to load {{udtClass}} using this code:
{code}
case JSortedObject(
("class", JString(udtClass)),
("pyClass", _),
("sqlType", _),
("type", JString("udt"))) =>
  Class.forName(udtClass).newInstance().asInstanceOf[UserDefinedType[_]]
  }
{code}

Unfortunately, my UDT is loaded by {{SparkIMain$TranslatingClassLoader}}, but 
DataType is loaded by {{Launcher$AppClassLoader}}.

{code}
scala> DataType.getClass.getClassLoader
res2: ClassLoader = sun.misc.Launcher$AppClassLoader@6876fb1b

scala> this.getClass.getClassLoader
res3: ClassLoader = 
org.apache.spark.repl.SparkIMain$TranslatingClassLoader@63d36b29
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6589) SQLUserDefinedType failed in spark-shell

2015-03-28 Thread Benyi Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14385499#comment-14385499
 ] 

Benyi Wang commented on SPARK-6589:
---

I found a method to fix this issue. But I still think DataType should find a 
better way to find the correct class loader.
{code}
# put the UDT jar to SPARK_CLASSPATH so that Launcher$AppClassLoader can find 
it.
export SPARK_CLASSPATH=myUDT.jar

spark-shell --jars myUDT.jar ...
{code}

> SQLUserDefinedType failed in spark-shell
> 
>
> Key: SPARK-6589
> URL: https://issues.apache.org/jira/browse/SPARK-6589
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.2.0
> Environment: CDH 5.3.2
>Reporter: Benyi Wang
>
> {{DataType.fromJson}} will fail in spark-shell if the schema includes "udt". 
> It works if running in an application. 
> This causes that I cannot read a parquet file including a UDT field. 
> {{DataType.fromCaseClass}} does not support UDT.
> I can load the class which shows that my UDT is in the classpath.
> {code}
> scala> Class.forName("com.bwang.MyTestUDT")
> res6: Class[_] = class com.bwang.MyTestUDT
> {code}
> But DataType fails:
> {code}
> scala> DataType.fromJson(json)
>   
> java.lang.ClassNotFoundException: com.bwang.MyTestUDT
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:190)
> at 
> org.apache.spark.sql.catalyst.types.DataType$.parseDataType(dataTypes.scala:77)
> {code}
> The reason is DataType.fromJson tries to load {{udtClass}} using this code:
> {code}
> case JSortedObject(
> ("class", JString(udtClass)),
> ("pyClass", _),
> ("sqlType", _),
> ("type", JString("udt"))) =>
>   Class.forName(udtClass).newInstance().asInstanceOf[UserDefinedType[_]]
>   }
> {code}
> Unfortunately, my UDT is loaded by {{SparkIMain$TranslatingClassLoader}}, but 
> DataType is loaded by {{Launcher$AppClassLoader}}.
> {code}
> scala> DataType.getClass.getClassLoader
> res2: ClassLoader = sun.misc.Launcher$AppClassLoader@6876fb1b
> scala> this.getClass.getClassLoader
> res3: ClassLoader = 
> org.apache.spark.repl.SparkIMain$TranslatingClassLoader@63d36b29
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org