[jira] [Commented] (SPARK-24434) Support user-specified driver and executor pod templates

2018-08-27 Thread Henry Robinson (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594046#comment-16594046
 ] 

Henry Robinson commented on SPARK-24434:


Yeah, assignees are set after the PR is merged. I think the idea is to prevent 
someone from assigning an issue to themselves, effectively taking a lock, and 
never actually making progress. 

> Support user-specified driver and executor pod templates
> 
>
> Key: SPARK-24434
> URL: https://issues.apache.org/jira/browse/SPARK-24434
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Yinan Li
>Priority: Major
>
> With more requests for customizing the driver and executor pods coming, the 
> current approach of adding new Spark configuration options has some serious 
> drawbacks: 1) it means more Kubernetes specific configuration options to 
> maintain, and 2) it widens the gap between the declarative model used by 
> Kubernetes and the configuration model used by Spark. We should start 
> designing a solution that allows users to specify pod templates as central 
> places for all customization needs for the driver and executor pods. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24432) Add support for dynamic resource allocation

2018-06-18 Thread Henry Robinson (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516305#comment-16516305
 ] 

Henry Robinson commented on SPARK-24432:


I'm really interested in this feature. What's the current status? Beyond the 
work of porting the code from the fork, are there design questions still to be 
resolved? Anything that I and my colleagues can help drive forward?

> Add support for dynamic resource allocation
> ---
>
> Key: SPARK-24432
> URL: https://issues.apache.org/jira/browse/SPARK-24432
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Yinan Li
>Priority: Major
>
> This is an umbrella ticket for work on adding support for dynamic resource 
> allocation into the Kubernetes mode. This requires a Kubernetes-specific 
> external shuffle service. The feature is available in our fork at 
> github.com/apache-spark-on-k8s/spark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24374) SPIP: Support Barrier Scheduling in Apache Spark

2018-06-01 Thread Henry Robinson (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16498465#comment-16498465
 ] 

Henry Robinson commented on SPARK-24374:


The use case in the SPIP isn't 100% convincing. I'm concerned about the idea of 
embedding other execution engines inside Spark tasks, effectively using Spark's 
job scheduler as a cluster manager. Resource consumption would be very 
different at different phases of the specified task (some tasks would be 
waiting, one task would have launched MPI) making it hard for sensible 
allocation decisions to be made. Do you anticipate requiring support from the 
cluster manager for gang scheduling? Or is {{barrier()}} going to be enough to 
ensure that all tasks wait - even if they take up an executor slot on the 
cluster for a long time waiting for the other tasks to get scheduled?

An alternative design for the example in the SPIP document would be to split it 
into more than one Spark job, e.g.:

# (Spark job) write the input files in parallel
# (not via Spark) launch the MPI job via the cluster manager (perhaps a 
kubernetes pod, for example)
# (Spark job) consume the output files in parallel

Spark already has semantics that allow you to wait until a job is finished so 
the driver can naturally coordinate amongst the different phases without 
needing to add extra coordination primitives to individual tasks. But 
synchronous execution is a thing in distributed systems, of course, so maybe 
there are more compelling use cases than the one in the SPIP?  

> SPIP: Support Barrier Scheduling in Apache Spark
> 
>
> Key: SPARK-24374
> URL: https://issues.apache.org/jira/browse/SPARK-24374
> Project: Spark
>  Issue Type: Epic
>  Components: ML, Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Major
>  Labels: SPIP
> Attachments: SPIP_ Support Barrier Scheduling in Apache Spark.pdf
>
>
> (See details in the linked/attached SPIP doc.)
> {quote}
> The proposal here is to add a new scheduling model to Apache Spark so users 
> can properly embed distributed DL training as a Spark stage to simplify the 
> distributed training workflow. For example, Horovod uses MPI to implement 
> all-reduce to accelerate distributed TensorFlow training. The computation 
> model is different from MapReduce used by Spark. In Spark, a task in a stage 
> doesn’t depend on any other tasks in the same stage, and hence it can be 
> scheduled independently. In MPI, all workers start at the same time and pass 
> messages around. To embed this workload in Spark, we need to introduce a new 
> scheduling model, tentatively named “barrier scheduling”, which launches 
> tasks at the same time and provides users enough information and tooling to 
> embed distributed DL training. Spark can also provide an extra layer of fault 
> tolerance in case some tasks failed in the middle, where Spark would abort 
> all tasks and restart the stage.
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24393) SQL builtin: isinf

2018-05-25 Thread Henry Robinson (JIRA)
Henry Robinson created SPARK-24393:
--

 Summary: SQL builtin: isinf
 Key: SPARK-24393
 URL: https://issues.apache.org/jira/browse/SPARK-24393
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 2.4.0
Reporter: Henry Robinson


Along with the existing {{isnan}}, it would be helpful to have {{isinf}} to 
test if a float or double value is {{Infinity}}. 





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24254) Eagerly evaluate some subqueries over LocalRelation

2018-05-11 Thread Henry Robinson (JIRA)
Henry Robinson created SPARK-24254:
--

 Summary: Eagerly evaluate some subqueries over LocalRelation
 Key: SPARK-24254
 URL: https://issues.apache.org/jira/browse/SPARK-24254
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Henry Robinson


Some queries would benefit from evaluating subqueries over {{LocalRelations}} 
eagerly. For example:

{code}
SELECT t1.part_col FROM t1 JOIN (SELECT max(part_col) m FROM t2) foo WHERE 
t1.part_col = foo.m
{code}

If {{max(part_col)}} could be evaluated during planning, there's an opportunity 
to prune all but at most one partitions from the scan of {{t1}}. 

Similarly, a near-identical query with a non-scalar subquery in the {{WHERE}} 
clause:

{code}
SELECT * FROM t1 WHERE part_col IN (SELECT part_col FROM t2)
{code}

could be partially evaluated to eliminate some partitions, and remove the join 
from the plan. 

Obviously all subqueries over local relations can't be evaluated during 
planning, but certain whitelisted aggregates could be if the input cardinality 
isn't too high. 




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11150) Dynamic partition pruning

2018-05-10 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471311#comment-16471311
 ] 

Henry Robinson commented on SPARK-11150:


The title of this JIRA is 'dynamic partition pruning', but the examples are a) 
not related to dynamic partition pruning and b) work as expected in Spark 2.3.

Spark will correctly infer that, given {{t1.foo = t2.bar AND t2.bar = 1}}, that 
{{t1.foo = 1}}. It will prune partitions statically - at compile time - and 
that is reflected in the scan.

_Dynamic_ partition pruning is about pruning partitions based on information 
that can only be inferred at run time. A typical example is:

{{SELECT * FROM dim_table JOIN fact_table ON (dim_table.partcol = 
fact_table.partcol) WHERE dim_table.othercol > 10}}.

Little can be inferred from the query at compilation time about what partitions 
to scan in {{fact_table}} (except that only the intersection between 
{{fact_table}} and {{part_table}}'s partitions should be scanned). 

However at run time, the set of partition keys produced by scanning 
{{dim_table}} with the filter predicate can be recorded - usually at the join 
node - and sent to the probe side of the join (in this case {{fact_table}}). 
The scan of {{fact_table}} can use that set to filter out any partitions that 
aren't in the build side of the join, because they wouldn't match any rows 
during the join. Hive and Impala both support this kind of partition filtering 
(and it doesn't only have to apply to partitions - you can filter the rows as 
well if evaluating the predicate isn't too expensive).

The challenges are:

* making sure that the representation chosen for the filters is compact enough 
to be shuffled around all the executors that might be performing the scan task, 
while having a low false-positive rate
* adding the logic to the planner to detect these opportunities
* optionally disabling the filtering if it's not being selective enough
* coordinating amongst the build and probe side to ensure that the latter waits 
for the former (this is a bit easier in Spark because it's not a pipelined 
execution model)

Do we agree that this JIRA should be more explicitly made about dynamic 
partition pruning, or is that tracked elsewhere? If so, I propose closing this 
one; otherwise I can edit this one's description.



> Dynamic partition pruning
> -
>
> Key: SPARK-11150
> URL: https://issues.apache.org/jira/browse/SPARK-11150
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 1.5.1, 1.6.0, 2.0.0, 2.1.2, 2.2.1, 2.3.0
>Reporter: Younes
>Priority: Major
>
> Partitions are not pruned when joined on the partition columns.
> This is the same issue as HIVE-9152.
> Ex: 
> Select  from tab where partcol=1 will prune on value 1
> Select  from tab join dim on (dim.partcol=tab.partcol) where 
> dim.partcol=1 will scan all partitions.
> Tables are based on parquets.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23940) High-order function: transform_values(map<K, V1>, function<K, V1, V2>) → map<K, V2>

2018-04-30 Thread Henry Robinson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Robinson updated SPARK-23940:
---
Summary: High-order function: transform_values(map, function) → map  (was: High-ofer function: transform_values(map, 
function) → map)

> High-order function: transform_values(map, function) → 
> map
> ---
>
> Key: SPARK-23940
> URL: https://issues.apache.org/jira/browse/SPARK-23940
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Priority: Major
>
> Ref: https://prestodb.io/docs/current/functions/map.html
> Returns a map that applies function to each entry of map and transforms the 
> values.
> {noformat}
> SELECT transform_values(MAP(ARRAY[], ARRAY[]), (k, v) -> v + 1); -- {}
> SELECT transform_values(MAP(ARRAY [1, 2, 3], ARRAY [10, 20, 30]), (k, v) -> v 
> + k); -- {1 -> 11, 2 -> 22, 3 -> 33}
> SELECT transform_values(MAP(ARRAY [1, 2, 3], ARRAY ['a', 'b', 'c']), (k, v) 
> -> k * k); -- {1 -> 1, 2 -> 4, 3 -> 9}
> SELECT transform_values(MAP(ARRAY ['a', 'b'], ARRAY [1, 2]), (k, v) -> k || 
> CAST(v as VARCHAR)); -- {a -> a1, b -> b2}
> SELECT transform_values(MAP(ARRAY [1, 2], ARRAY [1.0, 1.4]), -- {1 -> 
> one_1.0, 2 -> two_1.4}
> (k, v) -> MAP(ARRAY[1, 2], ARRAY['one', 'two'])[k] || 
> '_' || CAST(v AS VARCHAR));
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24128) Mention spark.sql.crossJoin.enabled in implicit cartesian product error msg

2018-04-30 Thread Henry Robinson (JIRA)
Henry Robinson created SPARK-24128:
--

 Summary: Mention spark.sql.crossJoin.enabled in implicit cartesian 
product error msg
 Key: SPARK-24128
 URL: https://issues.apache.org/jira/browse/SPARK-24128
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Henry Robinson


The error message given when a query contains an implicit cartesian product 
suggests rewriting the query using {{CROSS JOIN}}, but not disabling the check 
using {{spark.sql.crossJoin.enabled=true}}. It's sometimes easier to change a 
config variable than edit a query, so it would be helpful to make the user 
aware of their options. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24125) Add quoting rules to SQL guide

2018-04-30 Thread Henry Robinson (JIRA)
Henry Robinson created SPARK-24125:
--

 Summary: Add quoting rules to SQL guide
 Key: SPARK-24125
 URL: https://issues.apache.org/jira/browse/SPARK-24125
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Henry Robinson


As far as I can tell, Spark SQL's quoting rules are as follows:

* {{`foo bar`}} is an identifier
* {{'foo bar'}} is a string literal
* {{"foo bar"}} is a string literal

The last of these is non-standard (usually {{"foo bar"}} is an identifier), and 
so it's probably worth mentioning these rules in the 'reference' section of the 
[SQL 
guide|http://spark.apache.org/docs/latest/sql-programming-guide.html#reference].

I'm assuming there's not a lot of enthusiasm to change the quoting rules, given 
it would be a breaking change, and that backticks work just fine as an 
alternative. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23852) Parquet MR bug can lead to incorrect SQL results

2018-04-24 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16451263#comment-16451263
 ] 

Henry Robinson commented on SPARK-23852:


Yes it has - the Parquet community are going to do a 1.8.3 release, mostly just 
for us for this issue. Parquet 1.10 has already been released, and includes 
this fix. Upgrading Spark trunk to that version is the subject of SPARK-23972.

> Parquet MR bug can lead to incorrect SQL results
> 
>
> Key: SPARK-23852
> URL: https://issues.apache.org/jira/browse/SPARK-23852
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Henry Robinson
>Priority: Blocker
>  Labels: correctness
>
> Parquet MR 1.9.0 and 1.8.2 both have a bug, PARQUET-1217, that means that 
> pushing certain predicates to Parquet scanners can return fewer results than 
> they should.
> The bug triggers in Spark when:
>  * The Parquet file being scanner has stats for the null count, but not the 
> max or min on the column with the predicate (Apache Impala writes files like 
> this).
>  * The vectorized Parquet reader path is not taken, and the parquet-mr reader 
> is used.
>  * A suitable <, <=, > or >= predicate is pushed down to Parquet.
> The bug is that the parquet-mr interprets the max and min of a row-group's 
> column as 0 in the absence of stats. So {{col > 0}} will filter all results, 
> even if some are > 0.
> There is no upstream release of Parquet that contains the fix for 
> PARQUET-1217, although a 1.10 release is planned.
> The least impactful workaround is to set the Parquet configuration 
> {{parquet.filter.stats.enabled}} to {{false}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24020) Sort-merge join inner range optimization

2018-04-18 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16443392#comment-16443392
 ] 

Henry Robinson commented on SPARK-24020:


This sounds like a 'band join' (e.g. 
http://pages.cs.wisc.edu/~dewitt/includes/paralleldb/vldb91.pdf, also see 
[Oracle's 
documentation|https://docs.oracle.com/en/database/oracle/oracle-database/12.2/sqlrf/Joins.html#GUID-568EC26F-199A-4339-BFD9-C4A0B9588937]).
 

Does your implementation also handle non-equi joins? e.g. {{WHERE t1.Y BETWEEN 
t2.Y -d AND t2.Y +d}}, with no equality clause in the join predicates. 

> Sort-merge join inner range optimization
> 
>
> Key: SPARK-24020
> URL: https://issues.apache.org/jira/browse/SPARK-24020
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Petar Zecevic
>Priority: Major
>
> The problem we are solving is the case where you have two big tables 
> partitioned by X column, but also sorted by Y column (within partitions) and 
> you need to calculate an expensive function on the joined rows. During a 
> sort-merge join, Spark will do cross-joins of all rows that have the same X 
> values and calculate the function's value on all of them. If the two tables 
> have a large number of rows per X, this can result in a huge number of 
> calculations.
> We hereby propose an optimization that would allow you to reduce the number 
> of matching rows per X using a range condition on Y columns of the two 
> tables. Something like:
> ... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d
> The way SMJ is currently implemented, these extra conditions have no 
> influence on the number of rows (per X) being checked because these extra 
> conditions are put in the same block with the function being calculated.
> Here we propose a change to the sort-merge join so that, when these extra 
> conditions are specified, a queue is used instead of the 
> ExternalAppendOnlyUnsafeRowArray class. This queue would then used as a 
> moving window across the values from the right relation as the left row 
> changes. You could call this a combination of an equi-join and a theta join 
> (we call it "sort-merge inner range join").
> Potential use-cases for this are joins based on spatial or temporal distance 
> calculations.
> The optimization should be triggered automatically when an equi-join 
> expression is present AND lower and upper range conditions on a secondary 
> column are specified. If the tables aren't sorted by both columns, 
> appropriate sorts should be added.
> To limit the impact of this change we also propose adding a new parameter 
> (tentatively named "spark.sql.join.smj.useInnerRangeOptimization") which 
> could be used to switch off the optimization entirely.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23973) Remove consecutive sorts

2018-04-12 Thread Henry Robinson (JIRA)
Henry Robinson created SPARK-23973:
--

 Summary: Remove consecutive sorts
 Key: SPARK-23973
 URL: https://issues.apache.org/jira/browse/SPARK-23973
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Henry Robinson


As a follow-on from SPARK-23375, it would be easy to remove redundant sorts in 
the following kind of query:

{code}
Seq((1), (3)).toDF("int").orderBy('int.asc).orderBy('int.desc).explain()

== Physical Plan ==
*(2) Sort [int#35 DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(int#35 DESC NULLS LAST, 200)
   +- *(1) Sort [int#35 ASC NULLS FIRST], true, 0
  +- Exchange rangepartitioning(int#35 ASC NULLS FIRST, 200)
 +- LocalTableScan [int#35]
{code}

There's no need to perform {{(1) Sort}}. Since the sort operator isn't stable, 
AFAIK, it should be ok to remove a sort on any column that gets 'overwritten' 
by a subsequent one in this way. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23972) Upgrade to Parquet 1.10

2018-04-12 Thread Henry Robinson (JIRA)
Henry Robinson created SPARK-23972:
--

 Summary: Upgrade to Parquet 1.10
 Key: SPARK-23972
 URL: https://issues.apache.org/jira/browse/SPARK-23972
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Henry Robinson


Now that Parquet 1.10 is out, we should consider moving to it, if for no other 
reason than to address SPARK-23852. 

There are some non-trivial changes to address. In particular, the [ValuesReader 
interface|https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java]
 has changed from passing data as {{byte[]}} to a {{ByteBufferInputStream}} 
abstraction that can hide multiple {{ByteBuffer}}s under the hood.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23852) Parquet MR bug can lead to incorrect SQL results

2018-04-12 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16436292#comment-16436292
 ] 

Henry Robinson commented on SPARK-23852:


[Here's a branch|https://github.com/henryr/spark/tree/spark-23852] with a test 
that fails if PARQUET-1217 is not fixed.

> Parquet MR bug can lead to incorrect SQL results
> 
>
> Key: SPARK-23852
> URL: https://issues.apache.org/jira/browse/SPARK-23852
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Henry Robinson
>Priority: Blocker
>  Labels: correctness
>
> Parquet MR 1.9.0 and 1.8.2 both have a bug, PARQUET-1217, that means that 
> pushing certain predicates to Parquet scanners can return fewer results than 
> they should.
> The bug triggers in Spark when:
>  * The Parquet file being scanner has stats for the null count, but not the 
> max or min on the column with the predicate (Apache Impala writes files like 
> this).
>  * The vectorized Parquet reader path is not taken, and the parquet-mr reader 
> is used.
>  * A suitable <, <=, > or >= predicate is pushed down to Parquet.
> The bug is that the parquet-mr interprets the max and min of a row-group's 
> column as 0 in the absence of stats. So {{col > 0}} will filter all results, 
> even if some are > 0.
> There is no upstream release of Parquet that contains the fix for 
> PARQUET-1217, although a 1.10 release is planned.
> The least impactful workaround is to set the Parquet configuration 
> {{parquet.filter.stats.enabled}} to {{false}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23957) Sorts in subqueries are redundant and can be removed

2018-04-10 Thread Henry Robinson (JIRA)
Henry Robinson created SPARK-23957:
--

 Summary: Sorts in subqueries are redundant and can be removed
 Key: SPARK-23957
 URL: https://issues.apache.org/jira/browse/SPARK-23957
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Henry Robinson


Unless combined with a {{LIMIT}}, there's no correctness reason that planned 
and optimized subqueries should have any sort operators (since the result of 
the subquery is an unordered collection of tuples). 

For example:

{{SELECT count(1) FROM (select id FROM dft ORDER by id)}}

has the following plan:
{code:java}
== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *(2) HashAggregate(keys=[], functions=[partial_count(1)])
  +- *(2) Project
 +- *(2) Sort [id#0L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(id#0L ASC NULLS FIRST, 200)
   +- *(1) Project [id#0L]
  +- *(1) FileScan parquet [id#0L] Batched: true, Format: 
Parquet, Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct
{code}
... but the sort operator is redundant.

Less intuitively, the sort is also redundant in selections from an ordered 
subquery:

{{SELECT * FROM (SELECT id FROM dft ORDER BY id)}}

has plan:
{code:java}
== Physical Plan ==
*(2) Sort [id#0L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(id#0L ASC NULLS FIRST, 200)
   +- *(1) Project [id#0L]
  +- *(1) FileScan parquet [id#0L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct
{code}
... but again, since the subquery returns a bag of tuples, the sort is 
unnecessary.

We should consider adding an optimizer rule that removes a sort inside a 
subquery. SPARK-23375 is related, but removes sorts that are functionally 
redundant because they perform the same ordering.
  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23852) Parquet MR bug can lead to incorrect SQL results

2018-04-02 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423491#comment-16423491
 ] 

Henry Robinson commented on SPARK-23852:


Partly, but not completely. If the column is dictionary encoded, the filters 
are applied to the dictionary as well at the row-group level. See 
[RowGroupFilter|https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java#L92].

> Parquet MR bug can lead to incorrect SQL results
> 
>
> Key: SPARK-23852
> URL: https://issues.apache.org/jira/browse/SPARK-23852
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Henry Robinson
>Priority: Blocker
>  Labels: correctness
>
> Parquet MR 1.9.0 and 1.8.2 both have a bug, PARQUET-1217, that means that 
> pushing certain predicates to Parquet scanners can return fewer results than 
> they should.
> The bug triggers in Spark when:
>  * The Parquet file being scanner has stats for the null count, but not the 
> max or min on the column with the predicate (Apache Impala writes files like 
> this).
>  * The vectorized Parquet reader path is not taken, and the parquet-mr reader 
> is used.
>  * A suitable <, <=, > or >= predicate is pushed down to Parquet.
> The bug is that the parquet-mr interprets the max and min of a row-group's 
> column as 0 in the absence of stats. So {{col > 0}} will filter all results, 
> even if some are > 0.
> There is no upstream release of Parquet that contains the fix for 
> PARQUET-1217, although a 1.10 release is planned.
> The least impactful workaround is to set the Parquet configuration 
> {{parquet.filter.stats.enabled}} to {{false}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23852) Parquet MR bug can lead to incorrect SQL results

2018-04-02 Thread Henry Robinson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Robinson updated SPARK-23852:
---
Description: 
Parquet MR 1.9.0 and 1.8.2 both have a bug, PARQUET-1217, that means that 
pushing certain predicates to Parquet scanners can return fewer results than 
they should.

The bug triggers in Spark when:
 * The Parquet file being scanner has stats for the null count, but not the max 
or min on the column with the predicate (Apache Impala writes files like this).
 * The vectorized Parquet reader path is not taken, and the parquet-mr reader 
is used.
 * A suitable <, <=, > or >= predicate is pushed down to Parquet.

The bug is that the parquet-mr interprets the max and min of a row-group's 
column as 0 in the absence of stats. So {{col > 0}} will filter all results, 
even if some are > 0.

There is no upstream release of Parquet that contains the fix for PARQUET-1217, 
although a 1.10 release is planned.

The least impactful workaround is to set the Parquet configuration 
{{parquet.filter.stats.enabled}} to {{false}}.

  was:
Parquet 1.9.0 and 1.8.2 both have a bug, PARQUET-1217, that means that pushing 
certain predicates to Parquet scanners can return fewer results than they 
should. 

The bug triggers in Spark when:

* The Parquet file being scanner has stats for the null count, but not the max 
or min on the column with the predicate (Apache Impala writes files like this).
* The vectorized Parquet reader path is not taken, and the parquet-mr reader is 
used. 
* A suitable <, <=, > or >= predicate is pushed down to Parquet. 

The bug is that the parquet-mr interprets the max and min of a row-group's 
column as 0 in the absence of stats. So {{col > 0}} will filter all results, 
even if some are  > 0.

There is no upstream release of Parquet that contains the fix for PARQUET-1217, 
although a 1.10 release is planned.

The least impactful workaround is to set the Parquet configuration 
{{parquet.filter.stats.enabled}} to {{false}}. 


> Parquet MR bug can lead to incorrect SQL results
> 
>
> Key: SPARK-23852
> URL: https://issues.apache.org/jira/browse/SPARK-23852
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Henry Robinson
>Priority: Critical
>
> Parquet MR 1.9.0 and 1.8.2 both have a bug, PARQUET-1217, that means that 
> pushing certain predicates to Parquet scanners can return fewer results than 
> they should.
> The bug triggers in Spark when:
>  * The Parquet file being scanner has stats for the null count, but not the 
> max or min on the column with the predicate (Apache Impala writes files like 
> this).
>  * The vectorized Parquet reader path is not taken, and the parquet-mr reader 
> is used.
>  * A suitable <, <=, > or >= predicate is pushed down to Parquet.
> The bug is that the parquet-mr interprets the max and min of a row-group's 
> column as 0 in the absence of stats. So {{col > 0}} will filter all results, 
> even if some are > 0.
> There is no upstream release of Parquet that contains the fix for 
> PARQUET-1217, although a 1.10 release is planned.
> The least impactful workaround is to set the Parquet configuration 
> {{parquet.filter.stats.enabled}} to {{false}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23852) Parquet MR bug can lead to incorrect SQL results

2018-04-02 Thread Henry Robinson (JIRA)
Henry Robinson created SPARK-23852:
--

 Summary: Parquet MR bug can lead to incorrect SQL results
 Key: SPARK-23852
 URL: https://issues.apache.org/jira/browse/SPARK-23852
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0
Reporter: Henry Robinson


Parquet 1.9.0 and 1.8.2 both have a bug, PARQUET-1217, that means that pushing 
certain predicates to Parquet scanners can return fewer results than they 
should. 

The bug triggers in Spark when:

* The Parquet file being scanner has stats for the null count, but not the max 
or min on the column with the predicate (Apache Impala writes files like this).
* The vectorized Parquet reader path is not taken, and the parquet-mr reader is 
used. 
* A suitable <, <=, > or >= predicate is pushed down to Parquet. 

The bug is that the parquet-mr interprets the max and min of a row-group's 
column as 0 in the absence of stats. So {{col > 0}} will filter all results, 
even if some are  > 0.

There is no upstream release of Parquet that contains the fix for PARQUET-1217, 
although a 1.10 release is planned.

The least impactful workaround is to set the Parquet configuration 
{{parquet.filter.stats.enabled}} to {{false}}. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23576) SparkSQL - Decimal data missing decimal point

2018-03-12 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16395803#comment-16395803
 ] 

Henry Robinson commented on SPARK-23576:


Do you have a smaller repro, or does it only reproduce if you create all three 
tables? 

> SparkSQL - Decimal data missing decimal point
> -
>
> Key: SPARK-23576
> URL: https://issues.apache.org/jira/browse/SPARK-23576
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
> Environment: spark 2.3.0
> linux
>Reporter: R
>Priority: Major
>
> Integers like 3 stored as a decimal display in sparksql as 300 with 
> no decimal point. But hive displays fine as 3.
> Repro steps:
>  # Create a .csv with the value 3
>  # Use spark to read the csv, cast it as decimal(31,8) and output to an ORC 
> file
>  # Use spark to read the ORC, infer the schema (it will infer 38,18 
> precision) and output to a Parquet file
>  # Create external hive table to read the parquet ( define the hive type as 
> decimal(31,8))
>  # Use spark-sql to select from the external hive table.
>  # Notice how sparksql shows 300    !!!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23634) AttributeReferences may be too conservative wrt nullability after optimization

2018-03-08 Thread Henry Robinson (JIRA)
Henry Robinson created SPARK-23634:
--

 Summary: AttributeReferences may be too conservative wrt 
nullability after optimization
 Key: SPARK-23634
 URL: https://issues.apache.org/jira/browse/SPARK-23634
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Henry Robinson


An {{AttributeReference}} effectively caches the nullability of its referent 
when it is created. Some optimization rules can transform a nullable attribute 
into a non-nullable one, but the references to it are not updated. 

We could add a transformation rule that visits every {{AttributeReference}} and 
fixes its nullability after optimization. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23606) Flakey FileBasedDataSourceSuite

2018-03-05 Thread Henry Robinson (JIRA)
Henry Robinson created SPARK-23606:
--

 Summary: Flakey FileBasedDataSourceSuite
 Key: SPARK-23606
 URL: https://issues.apache.org/jira/browse/SPARK-23606
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.0
Reporter: Henry Robinson


I've seen the following exception twice today in PR builds (one example: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87978/testReport/org.apache.spark.sql/FileBasedDataSourceSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/).
 It's not deterministic, as I've had one PR build pass in the same span.

{code:java}

sbt.ForkMain$ForkError: 
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to 
eventually never returned normally. Attempted 15 times over 10.016101897 
seconds. Last failure message: There are 1 possibly leaked file streams..
at 
org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:421)
at 
org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)
at 
org.apache.spark.sql.FileBasedDataSourceSuite.eventually(FileBasedDataSourceSuite.scala:30)
at 
org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:308)
at 
org.apache.spark.sql.FileBasedDataSourceSuite.eventually(FileBasedDataSourceSuite.scala:30)
at 
org.apache.spark.sql.test.SharedSparkSession$class.afterEach(SharedSparkSession.scala:114)
at 
org.apache.spark.sql.FileBasedDataSourceSuite.afterEach(FileBasedDataSourceSuite.scala:30)
at 
org.scalatest.BeforeAndAfterEach$$anonfun$1.apply$mcV$sp(BeforeAndAfterEach.scala:234)
at 
org.scalatest.Status$$anonfun$withAfterEffect$1.apply(Status.scala:379)
at 
org.scalatest.Status$$anonfun$withAfterEffect$1.apply(Status.scala:375)
at org.scalatest.SucceededStatus$.whenCompleted(Status.scala:454)
at org.scalatest.Status$class.withAfterEffect(Status.scala:375)
at org.scalatest.SucceededStatus$.withAfterEffect(Status.scala:426)
at 
org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:232)
at 
org.apache.spark.sql.FileBasedDataSourceSuite.runTest(FileBasedDataSourceSuite.scala:30)
at 
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
at 
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
at 
org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
at org.scalatest.Suite$class.run(Suite.scala:1147)
at 
org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
at 
org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
at 
org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
at 
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:52)
at 
org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
at 
org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:52)
at 
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
at 
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
at sbt.ForkMain$Run$2.call(ForkMain.java:296)
at sbt.ForkMain$Run$2.call(ForkMain.java:286)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: sbt.ForkMain$ForkError: java.lang.IllegalStateException: There are 1 
possibly leaked file streams.
at 
org.apache.spark.DebugFilesystem$.assertNoOpenStreams(DebugFilesystem.scala:54)
at 
org.apache.spark.sql.test.SharedSparkSession$$anonfun$afterEach$1.apply$mcV$sp(SharedSparkSession.scala:115)
at 
org.apache.spark.sql.test.SharedSparkSession$$anonfun$afterEach$1.apply(SharedSparkSession.scala:115)
  

[jira] [Updated] (SPARK-23604) ParquetInteroperabilityTest timestamp test should use Statistics.hasNonNullValue

2018-03-05 Thread Henry Robinson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Robinson updated SPARK-23604:
---
Description: 
We ran into an issue with a downstream build of Spark running against a custom 
Parquet build where {{ParquetInteroperabilityTestSuite}} started failing 
because {{Statistics.isEmpty}} changed its behavior as of PARQUET-1217. 
({{isEmpty()}} now considers whether there are 0 or more nulls, and by default 
{{num_nulls}} is 0 for 'empty' stats objects).

The test really cares about whether the statistics object has values, so a very 
simple fix to use {{hasNonNullValue}} instead corrects the issue. Filing it now 
because it's a backwards-compatible fix to the current Parquet version so we 
can fix it right now before we hit the issue in the future. 

  was:
We ran into an issue with a downstream build of Spark running against a custom 
Parquet build where {{ParquetInteroperabilityTestSuite}} started failing 
because {{Statistics.isEmpty}} changed its behavior as of PARQUET-1217. 
({{isEmpty() now considers whether there are 0 or more nulls, and by default 
{{num_nulls}} is 0 for 'empty' stats objects).

The test really cares about whether the statistics object has values, so a very 
simple fix to use {{hasNonNullValue}} instead corrects the issue. Filing it now 
because it's a backwards-compatible fix to the current Parquet version so we 
can fix it right now before we hit the issue in the future. 


> ParquetInteroperabilityTest timestamp test should use 
> Statistics.hasNonNullValue
> 
>
> Key: SPARK-23604
> URL: https://issues.apache.org/jira/browse/SPARK-23604
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Henry Robinson
>Priority: Minor
>
> We ran into an issue with a downstream build of Spark running against a 
> custom Parquet build where {{ParquetInteroperabilityTestSuite}} started 
> failing because {{Statistics.isEmpty}} changed its behavior as of 
> PARQUET-1217. ({{isEmpty()}} now considers whether there are 0 or more nulls, 
> and by default {{num_nulls}} is 0 for 'empty' stats objects).
> The test really cares about whether the statistics object has values, so a 
> very simple fix to use {{hasNonNullValue}} instead corrects the issue. Filing 
> it now because it's a backwards-compatible fix to the current Parquet version 
> so we can fix it right now before we hit the issue in the future. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23604) ParquetInteroperabilityTest timestamp test should use Statistics.hasNonNullValue

2018-03-05 Thread Henry Robinson (JIRA)
Henry Robinson created SPARK-23604:
--

 Summary: ParquetInteroperabilityTest timestamp test should use 
Statistics.hasNonNullValue
 Key: SPARK-23604
 URL: https://issues.apache.org/jira/browse/SPARK-23604
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: Henry Robinson


We ran into an issue with a downstream build of Spark running against a custom 
Parquet build where {{ParquetInteroperabilityTestSuite}} started failing 
because {{Statistics.isEmpty}} changed its behavior as of PARQUET-1217. 
({{isEmpty() now considers whether there are 0 or more nulls, and by default 
{{num_nulls}} is 0 for 'empty' stats objects).

The test really cares about whether the statistics object has values, so a very 
simple fix to use {{hasNonNullValue}} instead corrects the issue. Filing it now 
because it's a backwards-compatible fix to the current Parquet version so we 
can fix it right now before we hit the issue in the future. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23500) Filters on named_structs could be pushed into scans

2018-02-27 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379201#comment-16379201
 ] 

Henry Robinson commented on SPARK-23500:


Ok, I figured it out! {{SimplifyCreateStructOps}} does not get applied 
recursively across the whole plan, just across the expressions recursively 
reachable from the root. So even the following:

{{df.filter("named_struct('id', id, 'id2', id2).id > 1").select("id2")}}

doesn't trigger the rule because the {{named_struct}} is never seen. 

Changing the rule to walk the plan, and then walk the expression trees rooted 
at each node, caused the optimization to trigger.

{code}
object SimplifyCreateStructOps extends Rule[LogicalPlan] {
  override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case p 
=>
p.transformExpressionsUp {
  case GetStructField(createNamedStructLike: CreateNamedStructLike, 
ordinal, _) =>
createNamedStructLike.valExprs(ordinal)
}
  }
}
{code}



> Filters on named_structs could be pushed into scans
> ---
>
> Key: SPARK-23500
> URL: https://issues.apache.org/jira/browse/SPARK-23500
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Henry Robinson
>Priority: Major
>
> Simple filters on dataframes joined with {{joinWith()}} are missing an 
> opportunity to get pushed into the scan because they're written in terms of 
> {{named_struct}} that could be removed by the optimizer.
> Given the following simple query over two dataframes:
> {code:java}
> scala> val df = spark.read.parquet("one_million")
> df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df2 = spark.read.parquet("one_million")
> df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > 
> 30").explain
> == Physical Plan ==
> *(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight
> :- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94]
> :  +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
> Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
> PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
> struct, false].id))
>+- *(1) Project [named_struct(id, id#90L, id2, id2#91L) AS _2#95]
>   +- *(1) Filter (named_struct(id, id#90L, id2, id2#91L).id > 30)
>  +- *(1) FileScan parquet [id#90L,id2#91L] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[file:/Users/henry/src/spark/one_million], PartitionFilters: 
> [], PushedFilters: [], ReadSchema: struct
> {code}
> Using {{joinWith}} means that the filter is placed on a {{named_struct}}, and 
> is then pushed down. When the filter is just above the scan, the 
> wrapping-and-projection of {{named_struct(id...).id}} is a no-op and could be 
> removed. Then the filter can be pushed down to Parquet.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23500) Filters on named_structs could be pushed into scans

2018-02-26 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16377802#comment-16377802
 ] 

Henry Robinson edited comment on SPARK-23500 at 2/27/18 7:08 AM:
-

There's an optimizer rule, {{SimplifyCreateStructOps}}, that should be kicking 
in here. In a simpler plan, it correctly unnests the {{named_struct}}:

{code:java}
scala> df.filter("named_struct('id', id).id > 1").explain
== Physical Plan ==
*(1) Project [id#0L, id2#1L]
+- *(1) Filter (isnotnull(id#0L) && (id#0L > 1))
+- *(1) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,1)], 
ReadSchema: struct
{code}


was (Author: henryr):
There's an optimizer rule, {{SimplifyCreateStructOps}}, that should be kicking 
in here. In a simpler plan, it correctly unnests the {{named_struct}}:

{code:java}
scala> df.filter("named_struct('id', id).id > 1").explain
== Physical Plan ==
*(1) Project [id#0L, id2#1L]
+- *(1) Filter (isnotnull(id#0L) && (id#0L > 1))
+- *(1) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,1)], 
ReadSchema: struct

scala> df.joinWith(df2, df.col("id") === 
df2.col("2id")).project("_1.id2").filter("_1.id2 + _2.2id2 > 30").explain
:32: error: value project is not a member of 
org.apache.spark.sql.Dataset[(org.apache.spark.sql.Row, 
org.apache.spark.sql.Row)]
df.joinWith(df2, df.col("id") === 
df2.col("2id")).project("_1.id2").filter("_1.id2 + _2.2id2 > 30").explain{code}

> Filters on named_structs could be pushed into scans
> ---
>
> Key: SPARK-23500
> URL: https://issues.apache.org/jira/browse/SPARK-23500
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Henry Robinson
>Priority: Major
>
> Simple filters on dataframes joined with {{joinWith()}} are missing an 
> opportunity to get pushed into the scan because they're written in terms of 
> {{named_struct}} that could be removed by the optimizer.
> Given the following simple query over two dataframes:
> {code:java}
> scala> val df = spark.read.parquet("one_million")
> df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df2 = spark.read.parquet("one_million")
> df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > 
> 30").explain
> == Physical Plan ==
> *(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight
> :- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94]
> :  +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
> Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
> PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
> struct, false].id))
>+- *(1) Project [named_struct(id, id#90L, id2, id2#91L) AS _2#95]
>   +- *(1) Filter (named_struct(id, id#90L, id2, id2#91L).id > 30)
>  +- *(1) FileScan parquet [id#90L,id2#91L] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[file:/Users/henry/src/spark/one_million], PartitionFilters: 
> [], PushedFilters: [], ReadSchema: struct
> {code}
> Using {{joinWith}} means that the filter is placed on a {{named_struct}}, and 
> is then pushed down. When the filter is just above the scan, the 
> wrapping-and-projection of {{named_struct(id...).id}} is a no-op and could be 
> removed. Then the filter can be pushed down to Parquet.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23500) Filters on named_structs could be pushed into scans

2018-02-26 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16377802#comment-16377802
 ] 

Henry Robinson edited comment on SPARK-23500 at 2/26/18 11:51 PM:
--

There's an optimizer rule, {{SimplifyCreateStructOps}}, that should be kicking 
in here. In a simpler plan, it correctly unnests the {{named_struct}}:

{code:java}
scala> df.filter("named_struct('id', id).id > 1").explain
== Physical Plan ==
*(1) Project [id#0L, id2#1L]
+- *(1) Filter (isnotnull(id#0L) && (id#0L > 1))
+- *(1) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,1)], 
ReadSchema: struct

scala> df.joinWith(df2, df.col("id") === 
df2.col("2id")).project("_1.id2").filter("_1.id2 + _2.2id2 > 30").explain
:32: error: value project is not a member of 
org.apache.spark.sql.Dataset[(org.apache.spark.sql.Row, 
org.apache.spark.sql.Row)]
df.joinWith(df2, df.col("id") === 
df2.col("2id")).project("_1.id2").filter("_1.id2 + _2.2id2 > 30").explain{code}


was (Author: henryr):
There's an optimizer rule, {{SimplifyCreateStructOps}}, that should be kicking 
in here. In a simpler plan, it correctly unnests the {{named_struct}}:

{code:java}
scala> df.filter("named_struct('id', id).id > 1").explain
== Physical Plan ==
*(1) Project [id#0L, id2#1L]
+- *(1) Filter (isnotnull(id#0L) && (id#0L > 1))
+- *(1) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,1)], 
ReadSchema: struct

scala> df.joinWith(df2, df.col("id") === 
df2.col("2id")).project("_1.id2").filter("_1.id2 + _2.2id2 > 30").explain
:32: error: value project is not a member of 
org.apache.spark.sql.Dataset[(org.apache.spark.sql.Row, 
org.apache.spark.sql.Row)]
df.joinWith(df2, df.col("id") === 
df2.col("2id")).project("_1.id2").filter("_1.id2 + _2.2id2 > 30").explain{code}

> Filters on named_structs could be pushed into scans
> ---
>
> Key: SPARK-23500
> URL: https://issues.apache.org/jira/browse/SPARK-23500
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Henry Robinson
>Priority: Major
>
> Simple filters on dataframes joined with {{joinWith()}} are missing an 
> opportunity to get pushed into the scan because they're written in terms of 
> {{named_struct}} that could be removed by the optimizer.
> Given the following simple query over two dataframes:
> {code:java}
> scala> val df = spark.read.parquet("one_million")
> df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df2 = spark.read.parquet("one_million")
> df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > 
> 30").explain
> == Physical Plan ==
> *(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight
> :- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94]
> :  +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
> Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
> PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
> struct, false].id))
>+- *(1) Project [named_struct(id, id#90L, id2, id2#91L) AS _2#95]
>   +- *(1) Filter (named_struct(id, id#90L, id2, id2#91L).id > 30)
>  +- *(1) FileScan parquet [id#90L,id2#91L] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[file:/Users/henry/src/spark/one_million], PartitionFilters: 
> [], PushedFilters: [], ReadSchema: struct
> {code}
> Using {{joinWith}} means that the filter is placed on a {{named_struct}}, and 
> is then pushed down. When the filter is just above the scan, the 
> wrapping-and-projection of {{named_struct(id...).id}} is a no-op and could be 
> removed. Then the filter can be pushed down to Parquet.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23500) Filters on named_structs could be pushed into scans

2018-02-26 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16377802#comment-16377802
 ] 

Henry Robinson commented on SPARK-23500:


There's an optimizer rule, {{SimplifyCreateStructOps}}, that should be kicking 
in here. In a simpler plan, it correctly unnests the {{named_struct}}:

{code:java}
scala> df.filter("named_struct('id', id).id > 1").explain
== Physical Plan ==
*(1) Project [id#0L, id2#1L]
+- *(1) Filter (isnotnull(id#0L) && (id#0L > 1))
+- *(1) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,1)], 
ReadSchema: struct

scala> df.joinWith(df2, df.col("id") === 
df2.col("2id")).project("_1.id2").filter("_1.id2 + _2.2id2 > 30").explain
:32: error: value project is not a member of 
org.apache.spark.sql.Dataset[(org.apache.spark.sql.Row, 
org.apache.spark.sql.Row)]
df.joinWith(df2, df.col("id") === 
df2.col("2id")).project("_1.id2").filter("_1.id2 + _2.2id2 > 30").explain{code}

> Filters on named_structs could be pushed into scans
> ---
>
> Key: SPARK-23500
> URL: https://issues.apache.org/jira/browse/SPARK-23500
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Henry Robinson
>Priority: Major
>
> Simple filters on dataframes joined with {{joinWith()}} are missing an 
> opportunity to get pushed into the scan because they're written in terms of 
> {{named_struct}} that could be removed by the optimizer.
> Given the following simple query over two dataframes:
> {code:java}
> scala> val df = spark.read.parquet("one_million")
> df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df2 = spark.read.parquet("one_million")
> df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > 
> 30").explain
> == Physical Plan ==
> *(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight
> :- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94]
> :  +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
> Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
> PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
> struct, false].id))
>+- *(1) Project [named_struct(id, id#90L, id2, id2#91L) AS _2#95]
>   +- *(1) Filter (named_struct(id, id#90L, id2, id2#91L).id > 30)
>  +- *(1) FileScan parquet [id#90L,id2#91L] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[file:/Users/henry/src/spark/one_million], PartitionFilters: 
> [], PushedFilters: [], ReadSchema: struct
> {code}
> Using {{joinWith}} means that the filter is placed on a {{named_struct}}, and 
> is then pushed down. When the filter is just above the scan, the 
> wrapping-and-projection of {{named_struct(id...).id}} is a no-op and could be 
> removed. Then the filter can be pushed down to Parquet.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23500) Filters on named_structs could be pushed into scans

2018-02-23 Thread Henry Robinson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Robinson updated SPARK-23500:
---
Description: 
Simple filters on dataframes joined with {{joinWith()}} are missing an 
opportunity to get pushed into the scan because they're written in terms of 
{{named_struct}} that could be removed by the optimizer.

Given the following simple query over two dataframes:
{code:java}
scala> val df = spark.read.parquet("one_million")
df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]

scala> val df2 = spark.read.parquet("one_million")
df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]

scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > 
30").explain
== Physical Plan ==
*(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight
:- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94]
:  +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
struct, false].id))
   +- *(1) Project [named_struct(id, id#90L, id2, id2#91L) AS _2#95]
  +- *(1) Filter (named_struct(id, id#90L, id2, id2#91L).id > 30)
 +- *(1) FileScan parquet [id#90L,id2#91L] Batched: true, Format: 
Parquet, Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct
{code}

Using {{joinWith}} means that the filter is placed on a {{named_struct}}, and 
is then pushed down. When the filter is just above the scan, the 
wrapping-and-projection of {{named_struct(id...).id}} is a no-op and could be 
removed. Then the filter can be pushed down to Parquet.


  was:
Simple filters on dataframes joined with {{joinWith()}} are missing an 
opportunity to get pushed into the scan because they're written in terms of 
{{named_struct}} that could be removed by the optimizer.

Given the following simple query over two dataframes:
{code:java}
scala> val df = spark.read.parquet("one_million")
df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]

scala> val df2 = spark.read.parquet("one_million")
df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]

scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > 
30").explain
== Physical Plan ==
*(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight
:- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94]
: +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
struct, false].id))
+- *(1) Project [named_struct(id, id#90L, id2, id2#91L) AS _2#95]
+- *(1) Filter (named_struct(id, id#90L, id2, id2#91L).id > 30)
+- *(1) FileScan parquet [id#90L,id2#91L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct
{code}

Using {{joinWith}} means that the filter is placed on a {{named_struct}}, and 
is then pushed down. When the filter is just above the scan, the 
wrapping-and-projection of {{named_struct(id...).id}} is a no-op and could be 
removed. Then the filter can be pushed down to Parquet.



> Filters on named_structs could be pushed into scans
> ---
>
> Key: SPARK-23500
> URL: https://issues.apache.org/jira/browse/SPARK-23500
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Henry Robinson
>Priority: Major
>
> Simple filters on dataframes joined with {{joinWith()}} are missing an 
> opportunity to get pushed into the scan because they're written in terms of 
> {{named_struct}} that could be removed by the optimizer.
> Given the following simple query over two dataframes:
> {code:java}
> scala> val df = spark.read.parquet("one_million")
> df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df2 = spark.read.parquet("one_million")
> df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > 
> 30").explain
> == Physical Plan ==
> *(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight
> :- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94]
> :  +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
> Location: 

[jira] [Created] (SPARK-23500) Filters on named_structs could be pushed into scans

2018-02-23 Thread Henry Robinson (JIRA)
Henry Robinson created SPARK-23500:
--

 Summary: Filters on named_structs could be pushed into scans
 Key: SPARK-23500
 URL: https://issues.apache.org/jira/browse/SPARK-23500
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: Henry Robinson


Simple filters on dataframes joined with {{joinWith()}} are missing an 
opportunity to get pushed into the scan because they're written in terms of 
{{named_struct}} that could be removed by the optimizer.

Given the following simple query over two dataframes:
{code:java}
scala> val df = spark.read.parquet("one_million")
df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]

scala> val df2 = spark.read.parquet("one_million")
df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]

scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > 
30").explain
== Physical Plan ==
*(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight
:- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94]
: +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
struct, false].id))
+- *(1) Project [named_struct(id, id#90L, id2, id2#91L) AS _2#95]
+- *(1) Filter (named_struct(id, id#90L, id2, id2#91L).id > 30)
+- *(1) FileScan parquet [id#90L,id2#91L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct
{code}

Using {{joinWith}} means that the filter is placed on a {{named_struct}}, and 
is then pushed down. When the filter is just above the scan, the 
wrapping-and-projection of {{named_struct(id...).id}} is a no-op and could be 
removed. Then the filter can be pushed down to Parquet.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23157) withColumn fails for a column that is a result of mapped DataSet

2018-01-29 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16343962#comment-16343962
 ] 

Henry Robinson commented on SPARK-23157:


[~kretes] - I can see an argument for the behaviour you're describing, but 
that's not the way the API is apparently intended. Like Sean says, there are 
way too many ways to shoot yourself in the foot if you can stitch together 
arbitrary datasets like this if the Datasets are column-wise incompatible, and 
allowing the relatively small subset of cases where it would work would lead to 
a more confusing API, IMO. 

The documentation for {{withColumn()}} could be updated to make this clearer; 
if I get a moment today I'll submit a PR. 

> withColumn fails for a column that is a result of mapped DataSet
> 
>
> Key: SPARK-23157
> URL: https://issues.apache.org/jira/browse/SPARK-23157
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Tomasz Bartczak
>Priority: Minor
>
> Having 
> {code:java}
> case class R(id: String)
> val ds = spark.createDataset(Seq(R("1")))
> {code}
> This works:
> {code}
> scala> ds.withColumn("n", ds.col("id"))
> res16: org.apache.spark.sql.DataFrame = [id: string, n: string]
> {code}
> but when we map over ds it fails:
> {code}
> scala> ds.withColumn("n", ds.map(a => a).col("id"))
> org.apache.spark.sql.AnalysisException: resolved attribute(s) id#55 missing 
> from id#4 in operator !Project [id#4, id#55 AS n#57];;
> !Project [id#4, id#55 AS n#57]
> +- LocalRelation [id#4]
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:347)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
>   at 
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
>   at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
>   at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2884)
>   at org.apache.spark.sql.Dataset.select(Dataset.scala:1150)
>   at org.apache.spark.sql.Dataset.withColumn(Dataset.scala:1905)
>   ... 48 elided
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23157) withColumn fails for a column that is a result of mapped DataSet

2018-01-25 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340412#comment-16340412
 ] 

Henry Robinson commented on SPARK-23157:


I'm not sure if this should actually be expected to work. {{Dataset.map()}} 
will always return a dataset with a logical plan that's different to the 
original, so {{ds.map(a => a).col("id")}} has an expression that refers to an 
attribute ID that isn't produced by the original dataset. It seems like the 
requirement for {{ds.withColumn()}} is that the column argument is an 
expression over {{ds}}'s logical plan.

You get the same error doing the following, which is more explicit about these 
being two separate datasets.
{code:java}
scala> val ds = spark.createDataset(Seq(R("1")))
ds: org.apache.spark.sql.Dataset[R] = [id: string]

scala> val ds2 = spark.createDataset(Seq(R("1")))
ds2: org.apache.spark.sql.Dataset[R] = [id: string]

scala> ds.withColumn("id2", ds2.col("id"))
org.apache.spark.sql.AnalysisException: Resolved attribute(s) id#113 missing 
from id#1 in operator !Project [id#1, id#113 AS id2#115]. Attribute(s) with the 
same name appear in the operation: id. Please check if the right attribute(s) 
are used.;;
!Project [id#1, id#113 AS id2#115]
+- LocalRelation [id#1]

  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:297)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
  at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:70)
  at 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3286)
  at org.apache.spark.sql.Dataset.select(Dataset.scala:1303)
  at org.apache.spark.sql.Dataset.withColumns(Dataset.scala:2185)
  at org.apache.spark.sql.Dataset.withColumn(Dataset.scala:2152)
  ... 49 elided
{code}
If the {{map}} function weren't the identity, would you expect this still to 
work?

> withColumn fails for a column that is a result of mapped DataSet
> 
>
> Key: SPARK-23157
> URL: https://issues.apache.org/jira/browse/SPARK-23157
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Tomasz Bartczak
>Priority: Minor
>
> Having 
> {code:java}
> case class R(id: String)
> val ds = spark.createDataset(Seq(R("1")))
> {code}
> This works:
> {code}
> scala> ds.withColumn("n", ds.col("id"))
> res16: org.apache.spark.sql.DataFrame = [id: string, n: string]
> {code}
> but when we map over ds it fails:
> {code}
> scala> ds.withColumn("n", ds.map(a => a).col("id"))
> org.apache.spark.sql.AnalysisException: resolved attribute(s) id#55 missing 
> from id#4 in operator !Project [id#4, id#55 AS n#57];;
> !Project [id#4, id#55 AS n#57]
> +- LocalRelation [id#4]
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:347)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
>   at 
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
>   at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
>   at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2884)
>   at org.apache.spark.sql.Dataset.select(Dataset.scala:1150)
>   at org.apache.spark.sql.Dataset.withColumn(Dataset.scala:1905)
>   ... 48 elided
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23148) spark.read.csv with multiline=true gives FileNotFoundException if path contains spaces

2018-01-19 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16332999#comment-16332999
 ] 

Henry Robinson commented on SPARK-23148:


It seems like the problem is that 
{{CodecStreams.createInputStreamWithCloseResource}} can't properly handle a 
{{path}} argument that's URL-encoded. We could add an overload for 
{{createInputStreamWithCloseResource(Configuration, Path)}} and then pass {{new 
Path(new URI(path))}} from {{CSVDataSource.readFile()}}. This has the benefit 
of being a more localised change (and doesn't change the 'contract' that comes 
from {{FileScanRDD}} currently having URL-encoded pathnames everywhere. A 
strawman commit is 
[here|https://github.com/henryr/spark/commit/b8c51418ee7d4bca18179fd863f7f4885c98c0ef].

> spark.read.csv with multiline=true gives FileNotFoundException if path 
> contains spaces
> --
>
> Key: SPARK-23148
> URL: https://issues.apache.org/jira/browse/SPARK-23148
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Bogdan Raducanu
>Priority: Major
>
> Repro code:
> {code:java}
> spark.range(10).write.csv("/tmp/a b c/a.csv")
> spark.read.option("multiLine", false).csv("/tmp/a b c/a.csv").count
> 10
> spark.read.option("multiLine", true).csv("/tmp/a b c/a.csv").count
> java.io.FileNotFoundException: File 
> file:/tmp/a%20b%20c/a.csv/part-0-cf84f9b2-5fe6-4f54-a130-a1737689db00-c000.csv
>  does not exist
> {code}
> Trying to manually escape fails in a different place:
> {code}
> spark.read.option("multiLine", true).csv("/tmp/a%20b%20c/a.csv").count
> org.apache.spark.sql.AnalysisException: Path does not exist: 
> file:/tmp/a%20b%20c/a.csv;
>   at 
> org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:683)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:387)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:387)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at scala.collection.immutable.List.foreach(List.scala:381)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23148) spark.read.csv with multiline=true gives FileNotFoundException if path contains spaces

2018-01-19 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16332999#comment-16332999
 ] 

Henry Robinson edited comment on SPARK-23148 at 1/19/18 11:25 PM:
--

It seems like the problem is that 
{{CodecStreams.createInputStreamWithCloseResource}} can't properly handle a 
{{path}} argument that's URL-encoded. We could add an overload for 
{{createInputStreamWithCloseResource(Configuration, Path)}} and then pass {{new 
Path(new URI(path))}} from {{CSVDataSource.readFile()}}. This has the benefit 
of being a more localised change (and doesn't change the 'contract' that comes 
from {{FileScanRDD}} currently having URL-encoded pathnames everywhere). A 
strawman commit is 
[here|https://github.com/henryr/spark/commit/b8c51418ee7d4bca18179fd863f7f4885c98c0ef].


was (Author: henryr):
It seems like the problem is that 
{{CodecStreams.createInputStreamWithCloseResource}} can't properly handle a 
{{path}} argument that's URL-encoded. We could add an overload for 
{{createInputStreamWithCloseResource(Configuration, Path)}} and then pass {{new 
Path(new URI(path))}} from {{CSVDataSource.readFile()}}. This has the benefit 
of being a more localised change (and doesn't change the 'contract' that comes 
from {{FileScanRDD}} currently having URL-encoded pathnames everywhere. A 
strawman commit is 
[here|https://github.com/henryr/spark/commit/b8c51418ee7d4bca18179fd863f7f4885c98c0ef].

> spark.read.csv with multiline=true gives FileNotFoundException if path 
> contains spaces
> --
>
> Key: SPARK-23148
> URL: https://issues.apache.org/jira/browse/SPARK-23148
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Bogdan Raducanu
>Priority: Major
>
> Repro code:
> {code:java}
> spark.range(10).write.csv("/tmp/a b c/a.csv")
> spark.read.option("multiLine", false).csv("/tmp/a b c/a.csv").count
> 10
> spark.read.option("multiLine", true).csv("/tmp/a b c/a.csv").count
> java.io.FileNotFoundException: File 
> file:/tmp/a%20b%20c/a.csv/part-0-cf84f9b2-5fe6-4f54-a130-a1737689db00-c000.csv
>  does not exist
> {code}
> Trying to manually escape fails in a different place:
> {code}
> spark.read.option("multiLine", true).csv("/tmp/a%20b%20c/a.csv").count
> org.apache.spark.sql.AnalysisException: Path does not exist: 
> file:/tmp/a%20b%20c/a.csv;
>   at 
> org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:683)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:387)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:387)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at scala.collection.immutable.List.foreach(List.scala:381)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23062) EXCEPT documentation should make it clear that it's EXCEPT DISTINCT

2018-01-12 Thread Henry Robinson (JIRA)
Henry Robinson created SPARK-23062:
--

 Summary: EXCEPT documentation should make it clear that it's 
EXCEPT DISTINCT
 Key: SPARK-23062
 URL: https://issues.apache.org/jira/browse/SPARK-23062
 Project: Spark
  Issue Type: Improvement
  Components: Documentation, SQL
Affects Versions: 2.0.0
Reporter: Henry Robinson
Priority: Trivial


Between 1.6 and 2.0, the default behaviour for {{EXCEPT}} changed from {{EXCEPT 
ALL}} to {{EXCEPT DISTINCT}}. This is reasonable: postgres defaults to the 
same, and if you try and explicitly use {{EXCEPT ALL}} in 2.0 you get a good 
error message. However, the change was confusing to some users, so it's worth 
being explicit in the documentation.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22736) Consider caching decoded dictionaries in VectorizedColumnReader

2017-12-07 Thread Henry Robinson (JIRA)
Henry Robinson created SPARK-22736:
--

 Summary: Consider caching decoded dictionaries in 
VectorizedColumnReader
 Key: SPARK-22736
 URL: https://issues.apache.org/jira/browse/SPARK-22736
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.2.1
Reporter: Henry Robinson


{{VectorizedColumnReader.decodeDictionaryIds()}} calls {{dictionary.decodeToX}} 
for every dictionary ID encountered in a dict-encoded Parquet page.

The whole idea of dictionary encoding is that a) values are repeated in a page 
and b) the dictionary only contains values that are in a page. So we should be 
able to save some decoding cost by decoding the entire dictionary page once, at 
the cost of using some memory (but theoretically we could discard the encoded 
dictionary, I think), and using the decoded dictionary to populate rows. 

This is particularly true for TIMESTAMP data, which after SPARK-12297, might 
have a timezone conversion as part of its decoding step.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

2017-11-03 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238281#comment-16238281
 ] 

Henry Robinson commented on SPARK-22211:


Sounds good, thanks both.

> LimitPushDown optimization for FullOuterJoin generates wrong results
> 
>
> Key: SPARK-22211
> URL: https://issues.apache.org/jira/browse/SPARK-22211
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>Reporter: Benyi Wang
>Priority: Major
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/10th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 10).toDF("id")
> val dr = shuffle(1 to 10).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>:- *Sort [id#10 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#10, 200)
>: +- *LocalLimit 1
>:+- LocalTableScan [id#10]
>+- *Sort [id#16 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(id#16, 200)
>  +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> ++---+
> |id  |id |
> ++---+
> |null|148|
> ++---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

2017-11-03 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237778#comment-16237778
 ] 

Henry Robinson commented on SPARK-22211:


[~smilegator] - sounds good! What will your approach be? I wasn't able to see a 
safe way to push the limit through the join without either a more invasive 
rewrite or restricting the set of join operators for FOJ. 

> LimitPushDown optimization for FullOuterJoin generates wrong results
> 
>
> Key: SPARK-22211
> URL: https://issues.apache.org/jira/browse/SPARK-22211
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>Reporter: Benyi Wang
>Priority: Major
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/10th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 10).toDF("id")
> val dr = shuffle(1 to 10).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>:- *Sort [id#10 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#10, 200)
>: +- *LocalLimit 1
>:+- LocalTableScan [id#10]
>+- *Sort [id#16 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(id#16, 200)
>  +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> ++---+
> |id  |id |
> ++---+
> |null|148|
> ++---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

2017-10-30 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225816#comment-16225816
 ] 

Henry Robinson edited comment on SPARK-22211 at 10/30/17 9:59 PM:
--

Thinking about it a more, I think the optimization that's currently implemented 
works as long as a) the limit is pushed to the streaming side of the join and 
b) the physical join implementation guarantees that it will emit rows that have 
non-null RHSs from the streaming side before any that have a null RHS.

That is: say we've got a build-side of one row, (A,C), and a streaming-side of 
(A,B). If we do a full outer-join of these two inputs, the result should be 
some ordering of (A, A), (null, B), (C, null). If we do a FOJ with LIMIT 1 
pushed to the streaming side, imagine it returns (B). It's an error if the join 
operator sees that A on the build-side has no match, and emits that (A, null) 
before it sees that B has no match and emits (null, B). But if it emits (null, 
B) first, the limit above it should kick in and no further rows will be 
emitted. 

It seems a bit fragile to rely on this behaviour from all join implementations, 
and it has some implications for other transformations (e.g. it would not be 
safe to flip the join order for a FOJ with a pushed-down limit - but would be 
ok for a non-pushed-down one). However, it's also a bit concerning to remove an 
optimization that's probably a big win for some queries, even if it's 
incorrect. There are rewrites that would work, e.g.:

{code}x.join(y, x('bar') === y('bar'), "outer").limit(10) ==> 
x.sort.limit(10).join(y.sort.limit(10), x('bar') === y('bar'), 
"outer").sort.limit(10) {code}

seems like it would be correct. But for now, how about we disable the push-down 
optimization in {{LimitPushDown}} and see if there's a need to investigate more 
complicated optimizations after that?


was (Author: henryr):
Thinking about it a more, I think the optimization that's currently implemented 
works as long as a) the limit is pushed to the streaming side of the join and 
b) the physical join implementation guarantees that it will emit rows that have 
non-null RHSs from the streaming side before any that have a null RHS.

That is: say we've got a build-side of one row, (A,C), and a streaming-side of 
(A,B). If we do a full outer-join of these two inputs, the result should be 
some ordering of (A, A), (null, B), (C, null). If we do a FOJ with LIMIT 1 
pushed to the streaming side, imagine it returns (B). It's an error if the join 
operator sees that A on the build-side has no match, and emits that (A, null) 
before it sees that B has no match and emits (null, B). But if it emits (null, 
B) first, the limit above it should kick in and no further rows will be 
emitted. 

It seems a bit fragile to rely on this behaviour from all join implementations, 
and it has some implications for other transformations (e.g. it would not be 
safe to flip the join order for a FOJ with a pushed-down limit - but would be 
ok for a non-pushed-down one). However, it's also a bit concerning to remove an 
optimization that's probably a big win for some queries, even if it's 
incorrect. There are rewrites that would work, e.g.:

{{ x.join(y, x('bar') === y('bar'), "outer").limit(10) ==> 
x.sort.limit(10).join(y.sort.limit(10), x('bar') === y('bar'), 
"outer").sort.limit(10) }}

seems like it would be correct. But for now, how about we disable the push-down 
optimization in {{LimitPushDown}} and see if there's a need to investigate more 
complicated optimizations after that?

> LimitPushDown optimization for FullOuterJoin generates wrong results
> 
>
> Key: SPARK-22211
> URL: https://issues.apache.org/jira/browse/SPARK-22211
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>Reporter: Benyi Wang
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/10th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 

[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

2017-10-30 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225816#comment-16225816
 ] 

Henry Robinson commented on SPARK-22211:


Thinking about it a more, I think the optimization that's currently implemented 
works as long as a) the limit is pushed to the streaming side of the join and 
b) the physical join implementation guarantees that it will emit rows that have 
non-null RHSs from the streaming side before any that have a null RHS.

That is: say we've got a build-side of one row, (A,C), and a streaming-side of 
(A,B). If we do a full outer-join of these two inputs, the result should be 
some ordering of (A, A), (null, B), (C, null). If we do a FOJ with LIMIT 1 
pushed to the streaming side, imagine it returns (B). It's an error if the join 
operator sees that A on the build-side has no match, and emits that (A, null) 
before it sees that B has no match and emits (null, B). But if it emits (null, 
B) first, the limit above it should kick in and no further rows will be 
emitted. 

It seems a bit fragile to rely on this behaviour from all join implementations, 
and it has some implications for other transformations (e.g. it would not be 
safe to flip the join order for a FOJ with a pushed-down limit - but would be 
ok for a non-pushed-down one). However, it's also a bit concerning to remove an 
optimization that's probably a big win for some queries, even if it's 
incorrect. There are rewrites that would work, e.g.:

{{ x.join(y, x('bar') === y('bar'), "outer").limit(10) ==> 
x.sort.limit(10).join(y.sort.limit(10), x('bar') === y('bar'), 
"outer").sort.limit(10) }}

seems like it would be correct. But for now, how about we disable the push-down 
optimization in {{LimitPushDown}} and see if there's a need to investigate more 
complicated optimizations after that?

> LimitPushDown optimization for FullOuterJoin generates wrong results
> 
>
> Key: SPARK-22211
> URL: https://issues.apache.org/jira/browse/SPARK-22211
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>Reporter: Benyi Wang
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/10th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 10).toDF("id")
> val dr = shuffle(1 to 10).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>:- *Sort [id#10 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#10, 200)
>: +- *LocalLimit 1
>:+- LocalTableScan [id#10]
>+- *Sort [id#16 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(id#16, 200)
>  +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> ++---+
> |id  |id |
> ++---+
> |null|148|
> ++---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

2017-10-27 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16223073#comment-16223073
 ] 

Henry Robinson commented on SPARK-22211:


I think the optimization proposed works only for a self-join. If you transform 
a FOJ into a one-sided outer join in general, you might omit rows from the 
result that should be there. 

Assume that the idea is to transform the full outer-join into a right 
outer-join, with the limit pushed to either the LHS or RHS. One problem comes 
if the limit is pushed to the RHS, but is larger than the number of rows in the 
RHS. For example, if you have tables T1 = (1,2) and T2 = (1), consider the 
following query:

{{SELECT * FROM T1 a FULL OUTER JOIN T2 b on a.col = b.col LIMIT 2}}.

If the limit is pushed into T2's scan (and the FOJ is changed to a ROJ), the 
query would emit only the tuple (1,1) - and omit (2,null) which should be 
included.

(If the limit is pushed into T1's scan there are other bugs if the limit is 
_less_ than the number of rows, by a similar argument).

I checked and I don't think Postgres tries to push limits into a FOJ either. 
Impala doesn't. 


> LimitPushDown optimization for FullOuterJoin generates wrong results
> 
>
> Key: SPARK-22211
> URL: https://issues.apache.org/jira/browse/SPARK-22211
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>Reporter: Benyi Wang
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/10th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 10).toDF("id")
> val dr = shuffle(1 to 10).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>:- *Sort [id#10 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#10, 200)
>: +- *LocalLimit 1
>:+- LocalTableScan [id#10]
>+- *Sort [id#16 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(id#16, 200)
>  +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> ++---+
> |id  |id |
> ++---+
> |null|148|
> ++---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org