[jira] [Resolved] (SPARK-21034) Allow filter pushdown filters through non deterministic functions for columns involved in groupby / join

2017-08-15 Thread Abhijit Bhole (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhijit Bhole resolved SPARK-21034.
---
Resolution: Duplicate

> Allow filter pushdown filters through non deterministic functions for columns 
> involved in groupby / join
> 
>
> Key: SPARK-21034
> URL: https://issues.apache.org/jira/browse/SPARK-21034
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.1.1, 2.2.0
>Reporter: Abhijit Bhole
>
> If the column is involved in aggregation / join then pushing down filter 
> should not change the results.
> Here is a sample code - 
> {code:java}
> from pyspark.sql import functions as F
> df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" 
> : 8},
>{ "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, 
> "c":7} ])
> df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain()
> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *HashAggregate(keys=[a#15L], functions=[sum(b#16L)])
> +- Exchange hashpartitioning(a#15L, 4)
>+- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)])
>   +- *Project [a#15L, b#16L]
>  +- *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> >>>
> >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)])
>+- Exchange hashpartitioning(a#15L, 4)
>   +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), 
> partial_first(c#17L, false)])
>  +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> {code}
> As you can see, the filter is not pushed down when F.first aggregate function 
> is used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21034) Allow filter pushdown filters through non deterministic functions for columns involved in groupby / join

2017-08-15 Thread Abhijit Bhole (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16127454#comment-16127454
 ] 

Abhijit Bhole commented on SPARK-21034:
---

Thank you !!


> Allow filter pushdown filters through non deterministic functions for columns 
> involved in groupby / join
> 
>
> Key: SPARK-21034
> URL: https://issues.apache.org/jira/browse/SPARK-21034
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.1.1, 2.2.0
>Reporter: Abhijit Bhole
>
> If the column is involved in aggregation / join then pushing down filter 
> should not change the results.
> Here is a sample code - 
> {code:java}
> from pyspark.sql import functions as F
> df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" 
> : 8},
>{ "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, 
> "c":7} ])
> df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain()
> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *HashAggregate(keys=[a#15L], functions=[sum(b#16L)])
> +- Exchange hashpartitioning(a#15L, 4)
>+- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)])
>   +- *Project [a#15L, b#16L]
>  +- *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> >>>
> >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)])
>+- Exchange hashpartitioning(a#15L, 4)
>   +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), 
> partial_first(c#17L, false)])
>  +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> {code}
> As you can see, the filter is not pushed down when F.first aggregate function 
> is used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-21034) Allow filter pushdown filters through non deterministic functions for columns involved in groupby / join

2017-08-14 Thread Abhijit Bhole (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhijit Bhole updated SPARK-21034:
--
Comment: was deleted

(was: In FilterPushdownSuite.scala, it seems this should have been handled? Am 
I making some mistake in understanding?
 
{code:java}
 test("nondeterministic: push down part of filter through aggregate with 
deterministic field") {
val originalQuery = testRelation
  .groupBy('a)('a)
  .where('a > 5 && Rand(10) > 5)
  .analyze

val optimized = Optimize.execute(originalQuery.analyze)

val correctAnswer = testRelation
  .where('a > 5)
  .groupBy('a)('a)
  .where(Rand(10) > 5)
  .analyze

comparePlans(optimized, correctAnswer)
  }
{code}
)

> Allow filter pushdown filters through non deterministic functions for columns 
> involved in groupby / join
> 
>
> Key: SPARK-21034
> URL: https://issues.apache.org/jira/browse/SPARK-21034
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.1.1, 2.2.0
>Reporter: Abhijit Bhole
>
> If the column is involved in aggregation / join then pushing down filter 
> should not change the results.
> Here is a sample code - 
> {code:java}
> from pyspark.sql import functions as F
> df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" 
> : 8},
>{ "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, 
> "c":7} ])
> df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain()
> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *HashAggregate(keys=[a#15L], functions=[sum(b#16L)])
> +- Exchange hashpartitioning(a#15L, 4)
>+- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)])
>   +- *Project [a#15L, b#16L]
>  +- *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> >>>
> >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)])
>+- Exchange hashpartitioning(a#15L, 4)
>   +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), 
> partial_first(c#17L, false)])
>  +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> {code}
> As you can see, the filter is not pushed down when F.first aggregate function 
> is used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21034) Allow filter pushdown filters through non deterministic functions for columns involved in groupby / join

2017-08-14 Thread Abhijit Bhole (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125619#comment-16125619
 ] 

Abhijit Bhole commented on SPARK-21034:
---

In FilterPushdownSuite.scala, it seems this should have been handled? Am I 
making some mistake in understanding?
 
{code:java}
 test("nondeterministic: push down part of filter through aggregate with 
deterministic field") {
val originalQuery = testRelation
  .groupBy('a)('a)
  .where('a > 5 && Rand(10) > 5)
  .analyze

val optimized = Optimize.execute(originalQuery.analyze)

val correctAnswer = testRelation
  .where('a > 5)
  .groupBy('a)('a)
  .where(Rand(10) > 5)
  .analyze

comparePlans(optimized, correctAnswer)
  }
{code}


> Allow filter pushdown filters through non deterministic functions for columns 
> involved in groupby / join
> 
>
> Key: SPARK-21034
> URL: https://issues.apache.org/jira/browse/SPARK-21034
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.1.1, 2.2.0
>Reporter: Abhijit Bhole
>
> If the column is involved in aggregation / join then pushing down filter 
> should not change the results.
> Here is a sample code - 
> {code:java}
> from pyspark.sql import functions as F
> df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" 
> : 8},
>{ "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, 
> "c":7} ])
> df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain()
> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *HashAggregate(keys=[a#15L], functions=[sum(b#16L)])
> +- Exchange hashpartitioning(a#15L, 4)
>+- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)])
>   +- *Project [a#15L, b#16L]
>  +- *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> >>>
> >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)])
>+- Exchange hashpartitioning(a#15L, 4)
>   +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), 
> partial_first(c#17L, false)])
>  +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> {code}
> As you can see, the filter is not pushed down when F.first aggregate function 
> is used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21034) Allow filter pushdown filters through non deterministic functions for columns involved in groupby / join

2017-08-09 Thread Abhijit Bhole (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhijit Bhole updated SPARK-21034:
--
Description: 
If the column is involved in aggregation / join then pushing down filter should 
not change the results.

Here is a sample code - 


{code:java}
from pyspark.sql import functions as F

df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" : 
8},
   { "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, "c":7} 
])

df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain()

df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()

== Physical Plan ==
*HashAggregate(keys=[a#15L], functions=[sum(b#16L)])
+- Exchange hashpartitioning(a#15L, 4)
   +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)])
  +- *Project [a#15L, b#16L]
 +- *Filter (isnotnull(a#15L) && (a#15L = 1))
+- Scan ExistingRDD[a#15L,b#16L,c#17L]
>>>
>>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
== Physical Plan ==
*Filter (isnotnull(a#15L) && (a#15L = 1))
+- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)])
   +- Exchange hashpartitioning(a#15L, 4)
  +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), 
partial_first(c#17L, false)])
 +- Scan ExistingRDD[a#15L,b#16L,c#17L]
{code}


As you can see, the filter is not pushed down when F.first aggregate function 
is used.

  was:
Here is a sample code - 


{code:java}
from pyspark.sql import functions as F

df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" : 
8},
   { "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, "c":7} 
])

df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain()

df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()

== Physical Plan ==
*HashAggregate(keys=[a#15L], functions=[sum(b#16L)])
+- Exchange hashpartitioning(a#15L, 4)
   +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)])
  +- *Project [a#15L, b#16L]
 +- *Filter (isnotnull(a#15L) && (a#15L = 1))
+- Scan ExistingRDD[a#15L,b#16L,c#17L]
>>>
>>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
== Physical Plan ==
*Filter (isnotnull(a#15L) && (a#15L = 1))
+- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)])
   +- Exchange hashpartitioning(a#15L, 4)
  +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), 
partial_first(c#17L, false)])
 +- Scan ExistingRDD[a#15L,b#16L,c#17L]
{code}


As you can see, the filter is not pushed down when F.first aggregate function 
is used.


> Allow filter pushdown filters through non deterministic functions for columns 
> involved in groupby / join
> 
>
> Key: SPARK-21034
> URL: https://issues.apache.org/jira/browse/SPARK-21034
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.1.1, 2.2.0
>Reporter: Abhijit Bhole
>
> If the column is involved in aggregation / join then pushing down filter 
> should not change the results.
> Here is a sample code - 
> {code:java}
> from pyspark.sql import functions as F
> df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" 
> : 8},
>{ "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, 
> "c":7} ])
> df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain()
> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *HashAggregate(keys=[a#15L], functions=[sum(b#16L)])
> +- Exchange hashpartitioning(a#15L, 4)
>+- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)])
>   +- *Project [a#15L, b#16L]
>  +- *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> >>>
> >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)])
>+- Exchange hashpartitioning(a#15L, 4)
>   +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), 
> partial_first(c#17L, false)])
>  +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> {code}
> As you can see, the filter is not pushed down when F.first aggregate function 
> is used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21034) Allow filter pushdown filters through non deterministic functions for columns involved in groupby / join

2017-08-09 Thread Abhijit Bhole (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhijit Bhole updated SPARK-21034:
--
Summary: Allow filter pushdown filters through non deterministic functions 
for columns involved in groupby / join  (was: Filter not getting pushed down 
the groupBy clause when first() or last() aggregate function is used)

> Allow filter pushdown filters through non deterministic functions for columns 
> involved in groupby / join
> 
>
> Key: SPARK-21034
> URL: https://issues.apache.org/jira/browse/SPARK-21034
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.1.1, 2.2.0
>Reporter: Abhijit Bhole
>
> Here is a sample code - 
> {code:java}
> from pyspark.sql import functions as F
> df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" 
> : 8},
>{ "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, 
> "c":7} ])
> df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain()
> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *HashAggregate(keys=[a#15L], functions=[sum(b#16L)])
> +- Exchange hashpartitioning(a#15L, 4)
>+- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)])
>   +- *Project [a#15L, b#16L]
>  +- *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> >>>
> >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)])
>+- Exchange hashpartitioning(a#15L, 4)
>   +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), 
> partial_first(c#17L, false)])
>  +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> {code}
> As you can see, the filter is not pushed down when F.first aggregate function 
> is used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21034) Filter not getting pushed down the groupBy clause when first() or last() aggregate function is used

2017-08-02 Thread Abhijit Bhole (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112239#comment-16112239
 ] 

Abhijit Bhole commented on SPARK-21034:
---

Is there any example where a filter on a column included in groupBy columns can 
affect the final result in case of FIRST()?

> Filter not getting pushed down the groupBy clause when first() or last() 
> aggregate function is used
> ---
>
> Key: SPARK-21034
> URL: https://issues.apache.org/jira/browse/SPARK-21034
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.1.1, 2.2.0
>Reporter: Abhijit Bhole
>
> Here is a sample code - 
> {code:java}
> from pyspark.sql import functions as F
> df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" 
> : 8},
>{ "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, 
> "c":7} ])
> df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain()
> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *HashAggregate(keys=[a#15L], functions=[sum(b#16L)])
> +- Exchange hashpartitioning(a#15L, 4)
>+- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)])
>   +- *Project [a#15L, b#16L]
>  +- *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> >>>
> >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)])
>+- Exchange hashpartitioning(a#15L, 4)
>   +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), 
> partial_first(c#17L, false)])
>  +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> {code}
> As you can see, the filter is not pushed down when F.first aggregate function 
> is used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21479) Outer join filter pushdown in null supplying table when condition is on one of the joined columns

2017-07-28 Thread Abhijit Bhole (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16104708#comment-16104708
 ] 

Abhijit Bhole commented on SPARK-21479:
---

So here is the actual use case - 

{code:java}
spark = SparkSession.builder.getOrCreate()

df1 = spark.createDataFrame([{ "x" : 'c1', "a": 1, "b" : 2}, { "x" : 'c2', "a": 
3, "b" : 4}])
df2 = spark.createDataFrame([{ "x" : 'c1', "a": 1, "c" : 5}, { "x" : 'c1', "a": 
3, "c" : 6}, { "x" : 'c2', "a": 5, "c" : 8}])

df1.join(df2, ['x', 'a'], 'right_outer').where("b = 2").explain()

df1.join(df2, ['x', 'a'], 'right_outer').where("b = 2").show()

print 

df1 = spark.createDataFrame([{ "x" : 'c1', "a": 1, "b" : 2}, { "x" : 'c2', "a": 
3, "b" : 4}])
df2 = spark.createDataFrame([{ "x" : 'c1', "a": 1, "c" : 5}, { "x" : 'c1', "a": 
3, "c" : 6}, { "x" : 'c2', "a": 5, "c" : 8}])


df1.join(df2, ['x', 'a'], 'right_outer').where("x = 'c1'").explain()

df1.join(df2, ['x', 'a'], 'right_outer').where("x = 'c1'").show()
{code}

Output - 

{code:java}
== Physical Plan ==
*Project [x#458, a#456L, b#450L, c#457L]
+- *SortMergeJoin [x#451, a#449L], [x#458, a#456L], Inner
   :- *Sort [x#451 ASC NULLS FIRST, a#449L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#451, a#449L, 4)
   : +- *Filter (((isnotnull(b#450L) && (b#450L = 2)) && isnotnull(x#451)) 
&& isnotnull(a#449L))
   :+- Scan ExistingRDD[a#449L,b#450L,x#451]
   +- *Sort [x#458 ASC NULLS FIRST, a#456L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(x#458, a#456L, 4)
 +- *Filter (isnotnull(x#458) && isnotnull(a#456L))
+- Scan ExistingRDD[a#456L,c#457L,x#458]
+---+---+---+---+
|  x|  a|  b|  c|
+---+---+---+---+
| c1|  1|  2|  5|
+---+---+---+---+


== Physical Plan ==
*Project [x#490, a#488L, b#482L, c#489L]
+- SortMergeJoin [x#483, a#481L], [x#490, a#488L], RightOuter
   :- *Sort [x#483 ASC NULLS FIRST, a#481L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#483, a#481L, 4)
   : +- Scan ExistingRDD[a#481L,b#482L,x#483]
   +- *Sort [x#490 ASC NULLS FIRST, a#488L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(x#490, a#488L, 4)
 +- *Filter (isnotnull(x#490) && (x#490 = c1))
+- Scan ExistingRDD[a#488L,c#489L,x#490]
+---+---++---+
|  x|  a|   b|  c|
+---+---++---+
| c1|  1|   2|  5|
| c1|  3|null|  6|
+---+---++---+
{code}

As you can see filter on 'x' column does not get pushed down. In our cases, 'x' 
is a company id in an multi tenant system and it is extremely important to pass 
this filter to both dataframes or else it fetches the entire data for both the 
tables.


> Outer join filter pushdown in null supplying table when condition is on one 
> of the joined columns
> -
>
> Key: SPARK-21479
> URL: https://issues.apache.org/jira/browse/SPARK-21479
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.1.0, 2.1.1, 2.2.0
>Reporter: Abhijit Bhole
>
> Here are two different query plans - 
> {code:java}
> df1 = spark.createDataFrame([{ "a": 1, "b" : 2}, { "a": 3, "b" : 4}])
> df2 = spark.createDataFrame([{ "a": 1, "c" : 5}, { "a": 3, "c" : 6}, { "a": 
> 5, "c" : 8}])
> df1.join(df2, ['a'], 'right_outer').where("b = 2").explain()
> == Physical Plan ==
> *Project [a#16299L, b#16295L, c#16300L]
> +- *SortMergeJoin [a#16294L], [a#16299L], Inner
>:- *Sort [a#16294L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(a#16294L, 4)
>: +- *Filter ((isnotnull(b#16295L) && (b#16295L = 2)) && 
> isnotnull(a#16294L))
>:+- Scan ExistingRDD[a#16294L,b#16295L]
>+- *Sort [a#16299L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(a#16299L, 4)
>  +- *Filter isnotnull(a#16299L)
> +- Scan ExistingRDD[a#16299L,c#16300L]
> df1 = spark.createDataFrame([{ "a": 1, "b" : 2}, { "a": 3, "b" : 4}])
> df2 = spark.createDataFrame([{ "a": 1, "c" : 5}, { "a": 3, "c" : 6}, { "a": 
> 5, "c" : 8}])
> df1.join(df2, ['a'], 'right_outer').where("a = 1").explain()
> == Physical Plan ==
> *Project [a#16314L, b#16310L, c#16315L]
> +- SortMergeJoin [a#16309L], [a#16314L], RightOuter
>:- *Sort [a#16309L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(a#16309L, 4)
>: +- Scan ExistingRDD[a#16309L,b#16310L]
>+- *Sort [a#16314L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(a#16314L, 4)
>  +- *Filter (isnotnull(a#16314L) && (a#16314L = 1))
> +- Scan ExistingRDD[a#16314L,c#16315L]
> {code}
> If condition on b can be pushed down on df1 then why not condition on a?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscrib

[jira] [Updated] (SPARK-21034) Filter not getting pushed down the groupBy clause when first() or last() aggregate function is used

2017-07-20 Thread Abhijit Bhole (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhijit Bhole updated SPARK-21034:
--
Component/s: SQL

> Filter not getting pushed down the groupBy clause when first() or last() 
> aggregate function is used
> ---
>
> Key: SPARK-21034
> URL: https://issues.apache.org/jira/browse/SPARK-21034
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.1.1, 2.2.0
>Reporter: Abhijit Bhole
>
> Here is a sample code - 
> {code:java}
> from pyspark.sql import functions as F
> df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" 
> : 8},
>{ "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, 
> "c":7} ])
> df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain()
> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *HashAggregate(keys=[a#15L], functions=[sum(b#16L)])
> +- Exchange hashpartitioning(a#15L, 4)
>+- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)])
>   +- *Project [a#15L, b#16L]
>  +- *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> >>>
> >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)])
>+- Exchange hashpartitioning(a#15L, 4)
>   +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), 
> partial_first(c#17L, false)])
>  +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> {code}
> As you can see, the filter is not pushed down when F.first aggregate function 
> is used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21034) Filter not getting pushed down the groupBy clause when first() or last() aggregate function is used

2017-07-19 Thread Abhijit Bhole (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhijit Bhole updated SPARK-21034:
--
Description: 
Here is a sample code - 


{code:java}
from pyspark.sql import functions as F

df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" : 
8},
   { "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, "c":7} 
])

df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain()

df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()

== Physical Plan ==
*HashAggregate(keys=[a#15L], functions=[sum(b#16L)])
+- Exchange hashpartitioning(a#15L, 4)
   +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)])
  +- *Project [a#15L, b#16L]
 +- *Filter (isnotnull(a#15L) && (a#15L = 1))
+- Scan ExistingRDD[a#15L,b#16L,c#17L]
>>>
>>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
== Physical Plan ==
*Filter (isnotnull(a#15L) && (a#15L = 1))
+- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)])
   +- Exchange hashpartitioning(a#15L, 4)
  +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), 
partial_first(c#17L, false)])
 +- Scan ExistingRDD[a#15L,b#16L,c#17L]
{code}


As you can see, the filter is not pushed down when F.first aggregate function 
is used.

  was:
For example, in my sample code - 

seriesUserMetricsDF = (userSeriesGameMetricsDF
   .groupBy(['companyId', "seriesId", 'userId'])
   .agg(
F.last('invitedOn').alias('invitedOn'),
F.sum('score').alias('score')))

seriesUserMetricsDF.where(F.col('seriesId') == 12345)

the seriesId filter does not get pushed down to userSeriesGameMetricsDF. In 
Spark 2.1.0 it does.


> Filter not getting pushed down the groupBy clause when first() or last() 
> aggregate function is used
> ---
>
> Key: SPARK-21034
> URL: https://issues.apache.org/jira/browse/SPARK-21034
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.1.1, 2.2.0
>Reporter: Abhijit Bhole
>
> Here is a sample code - 
> {code:java}
> from pyspark.sql import functions as F
> df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" 
> : 8},
>{ "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, 
> "c":7} ])
> df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain()
> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *HashAggregate(keys=[a#15L], functions=[sum(b#16L)])
> +- Exchange hashpartitioning(a#15L, 4)
>+- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)])
>   +- *Project [a#15L, b#16L]
>  +- *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> >>>
> >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)])
>+- Exchange hashpartitioning(a#15L, 4)
>   +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), 
> partial_first(c#17L, false)])
>  +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> {code}
> As you can see, the filter is not pushed down when F.first aggregate function 
> is used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21479) Outer join filter pushdown in null supplying table when condition is on one of the joined columns

2017-07-19 Thread Abhijit Bhole (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhijit Bhole updated SPARK-21479:
--
Description: 
Here are two different query plans - 


{code:java}

df1 = spark.createDataFrame([{ "a": 1, "b" : 2}, { "a": 3, "b" : 4}])
df2 = spark.createDataFrame([{ "a": 1, "c" : 5}, { "a": 3, "c" : 6}, { "a": 5, 
"c" : 8}])

df1.join(df2, ['a'], 'right_outer').where("b = 2").explain()

== Physical Plan ==
*Project [a#16299L, b#16295L, c#16300L]
+- *SortMergeJoin [a#16294L], [a#16299L], Inner
   :- *Sort [a#16294L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#16294L, 4)
   : +- *Filter ((isnotnull(b#16295L) && (b#16295L = 2)) && 
isnotnull(a#16294L))
   :+- Scan ExistingRDD[a#16294L,b#16295L]
   +- *Sort [a#16299L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(a#16299L, 4)
 +- *Filter isnotnull(a#16299L)
+- Scan ExistingRDD[a#16299L,c#16300L]


df1 = spark.createDataFrame([{ "a": 1, "b" : 2}, { "a": 3, "b" : 4}])
df2 = spark.createDataFrame([{ "a": 1, "c" : 5}, { "a": 3, "c" : 6}, { "a": 5, 
"c" : 8}])

df1.join(df2, ['a'], 'right_outer').where("a = 1").explain()

== Physical Plan ==
*Project [a#16314L, b#16310L, c#16315L]
+- SortMergeJoin [a#16309L], [a#16314L], RightOuter
   :- *Sort [a#16309L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#16309L, 4)
   : +- Scan ExistingRDD[a#16309L,b#16310L]
   +- *Sort [a#16314L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(a#16314L, 4)
 +- *Filter (isnotnull(a#16314L) && (a#16314L = 1))
+- Scan ExistingRDD[a#16314L,c#16315L]
{code}


If condition on b can be pushed down on df1 then why not condition on a?

  was:
Here are two different query plans - 

{{
df1 = spark.createDataFrame([{ "a": 1, "b" : 2}, { "a": 3, "b" : 4}])
df2 = spark.createDataFrame([{ "a": 1, "c" : 5}, { "a": 3, "c" : 6}, { "a": 5, 
"c" : 8}])

df1.join(df2, ['a'], 'right_outer').where("b = 2").explain()

== Physical Plan ==
*Project [a#16299L, b#16295L, c#16300L]
+- *SortMergeJoin [a#16294L], [a#16299L], Inner
   :- *Sort [a#16294L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#16294L, 4)
   : +- *Filter ((isnotnull(b#16295L) && (b#16295L = 2)) && 
isnotnull(a#16294L))
   :+- Scan ExistingRDD[a#16294L,b#16295L]
   +- *Sort [a#16299L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(a#16299L, 4)
 +- *Filter isnotnull(a#16299L)
+- Scan ExistingRDD[a#16299L,c#16300L]


df1 = spark.createDataFrame([{ "a": 1, "b" : 2}, { "a": 3, "b" : 4}])
df2 = spark.createDataFrame([{ "a": 1, "c" : 5}, { "a": 3, "c" : 6}, { "a": 5, 
"c" : 8}])

df1.join(df2, ['a'], 'right_outer').where("a = 1").explain()

== Physical Plan ==
*Project [a#16314L, b#16310L, c#16315L]
+- SortMergeJoin [a#16309L], [a#16314L], RightOuter
   :- *Sort [a#16309L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#16309L, 4)
   : +- Scan ExistingRDD[a#16309L,b#16310L]
   +- *Sort [a#16314L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(a#16314L, 4)
 +- *Filter (isnotnull(a#16314L) && (a#16314L = 1))
+- Scan ExistingRDD[a#16314L,c#16315L]
}}


If condition on b can be pushed down on df1 then why not condition on a?


> Outer join filter pushdown in null supplying table when condition is on one 
> of the joined columns
> -
>
> Key: SPARK-21479
> URL: https://issues.apache.org/jira/browse/SPARK-21479
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.1.0, 2.1.1, 2.2.0
>Reporter: Abhijit Bhole
>
> Here are two different query plans - 
> {code:java}
> df1 = spark.createDataFrame([{ "a": 1, "b" : 2}, { "a": 3, "b" : 4}])
> df2 = spark.createDataFrame([{ "a": 1, "c" : 5}, { "a": 3, "c" : 6}, { "a": 
> 5, "c" : 8}])
> df1.join(df2, ['a'], 'right_outer').where("b = 2").explain()
> == Physical Plan ==
> *Project [a#16299L, b#16295L, c#16300L]
> +- *SortMergeJoin [a#16294L], [a#16299L], Inner
>:- *Sort [a#16294L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(a#16294L, 4)
>: +- *Filter ((isnotnull(b#16295L) && (b#16295L = 2)) && 
> isnotnull(a#16294L))
>:+- Scan ExistingRDD[a#16294L,b#16295L]
>+- *Sort [a#16299L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(a#16299L, 4)
>  +- *Filter isnotnull(a#16299L)
> +- Scan ExistingRDD[a#16299L,c#16300L]
> df1 = spark.createDataFrame([{ "a": 1, "b" : 2}, { "a": 3, "b" : 4}])
> df2 = spark.createDataFrame([{ "a": 1, "c" : 5}, { "a": 3, "c" : 6}, { "a": 
> 5, "c" : 8}])
> df1.join(df2, ['a'], 'right_outer').where("a = 1").explain()
> == Physical Plan ==
> *Project [a#16314L, b#16310L, c#16315L]
> +- S

[jira] [Created] (SPARK-21479) Outer join filter pushdown in null supplying table when condition is on one of the joined columns

2017-07-19 Thread Abhijit Bhole (JIRA)
Abhijit Bhole created SPARK-21479:
-

 Summary: Outer join filter pushdown in null supplying table when 
condition is on one of the joined columns
 Key: SPARK-21479
 URL: https://issues.apache.org/jira/browse/SPARK-21479
 Project: Spark
  Issue Type: Bug
  Components: Optimizer, SQL
Affects Versions: 2.2.0, 2.1.1, 2.1.0
Reporter: Abhijit Bhole


Here are two different query plans - 

{{
df1 = spark.createDataFrame([{ "a": 1, "b" : 2}, { "a": 3, "b" : 4}])
df2 = spark.createDataFrame([{ "a": 1, "c" : 5}, { "a": 3, "c" : 6}, { "a": 5, 
"c" : 8}])

df1.join(df2, ['a'], 'right_outer').where("b = 2").explain()

== Physical Plan ==
*Project [a#16299L, b#16295L, c#16300L]
+- *SortMergeJoin [a#16294L], [a#16299L], Inner
   :- *Sort [a#16294L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#16294L, 4)
   : +- *Filter ((isnotnull(b#16295L) && (b#16295L = 2)) && 
isnotnull(a#16294L))
   :+- Scan ExistingRDD[a#16294L,b#16295L]
   +- *Sort [a#16299L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(a#16299L, 4)
 +- *Filter isnotnull(a#16299L)
+- Scan ExistingRDD[a#16299L,c#16300L]


df1 = spark.createDataFrame([{ "a": 1, "b" : 2}, { "a": 3, "b" : 4}])
df2 = spark.createDataFrame([{ "a": 1, "c" : 5}, { "a": 3, "c" : 6}, { "a": 5, 
"c" : 8}])

df1.join(df2, ['a'], 'right_outer').where("a = 1").explain()

== Physical Plan ==
*Project [a#16314L, b#16310L, c#16315L]
+- SortMergeJoin [a#16309L], [a#16314L], RightOuter
   :- *Sort [a#16309L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#16309L, 4)
   : +- Scan ExistingRDD[a#16309L,b#16310L]
   +- *Sort [a#16314L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(a#16314L, 4)
 +- *Filter (isnotnull(a#16314L) && (a#16314L = 1))
+- Scan ExistingRDD[a#16314L,c#16315L]
}}


If condition on b can be pushed down on df1 then why not condition on a?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21034) Filter not getting pushed down the groupBy clause when first() or last() aggregate function is used

2017-07-13 Thread Abhijit Bhole (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhijit Bhole updated SPARK-21034:
--
Affects Version/s: 2.2.0

> Filter not getting pushed down the groupBy clause when first() or last() 
> aggregate function is used
> ---
>
> Key: SPARK-21034
> URL: https://issues.apache.org/jira/browse/SPARK-21034
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.1.1, 2.2.0
>Reporter: Abhijit Bhole
>
> For example, in my sample code - 
> seriesUserMetricsDF = (userSeriesGameMetricsDF
>.groupBy(['companyId', "seriesId", 'userId'])
>.agg(
> F.last('invitedOn').alias('invitedOn'),
> F.sum('score').alias('score')))
> seriesUserMetricsDF.where(F.col('seriesId') == 12345)
> the seriesId filter does not get pushed down to userSeriesGameMetricsDF. In 
> Spark 2.1.0 it does.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21034) Filter not getting pushed down the groupBy clause when first() or last() aggregate function is used

2017-06-09 Thread Abhijit Bhole (JIRA)
Abhijit Bhole created SPARK-21034:
-

 Summary: Filter not getting pushed down the groupBy clause when 
first() or last() aggregate function is used
 Key: SPARK-21034
 URL: https://issues.apache.org/jira/browse/SPARK-21034
 Project: Spark
  Issue Type: Bug
  Components: Optimizer
Affects Versions: 2.1.1
Reporter: Abhijit Bhole


For example, in my sample code - 

seriesUserMetricsDF = (userSeriesGameMetricsDF
   .groupBy(['companyId', "seriesId", 'userId'])
   .agg(
F.last('invitedOn').alias('invitedOn'),
F.sum('score').alias('score')))

seriesUserMetricsDF.where(F.col('seriesId') == 12345)

the seriesId filter does not get pushed down to userSeriesGameMetricsDF. In 
Spark 2.1.0 it does.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org