[jira] [Created] (SPARK-27956) Allow subqueries as partition filter

2019-06-05 Thread Johannes Mayer (JIRA)
Johannes Mayer created SPARK-27956:
--

 Summary: Allow subqueries as partition filter
 Key: SPARK-27956
 URL: https://issues.apache.org/jira/browse/SPARK-27956
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: Johannes Mayer


Subqueries are not pushed down as partition filters. See following example

 
{code:java}
create table user_mayerjoh.tab (c1 string)
partitioned by (c2 string)
stored as parquet;
{code}
 

 
{code:java}
explain select * from user_mayerjoh.tab where c2 < 1;{code}
 

  == Physical Plan ==

*(1) FileScan parquet user_mayerjoh.tab[c1#22,c2#23] Batched: true, Format: 
Parquet, Location: PrunedInMemoryFileIndex[], PartitionCount: 0, 
*PartitionFilters: [isnotnull(c2#23), (cast(c2#23 as int) < 1)]*, 
PushedFilters: [], ReadSchema: struct

 

 
{code:java}
explain select * from user_mayerjoh.tab where c2 < (select 1);{code}
 

== Physical Plan ==

 

+- *(1) FileScan parquet user_mayerjoh.tab[c1#30,c2#31] Batched: true, Format: 
Parquet, Location: PrunedInMemoryFileIndex[], PartitionCount: 0, 
*PartitionFilters: [isnotnull(c2#31)]*, PushedFilters: [], ReadSchema: 
struct

 

Is it possible to first execute the subquery and use the result as partition 
filter?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24859) Predicates pushdown on outer joins

2018-07-20 Thread Johannes Mayer (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johannes Mayer updated SPARK-24859:
---
Description: 
I have two AVRO tables in Hive called FAct and DIm. Both are partitioned by a 
common column called part_col. Now I want to join both tables on their id but 
only for some of partitions.

If I use an inner join, everything works well:

 
{code:java}
select *
from FA f
join DI d
on(f.id = d.id and f.part_col = d.part_col)
where f.part_col = 'xyz'
{code}
 

In the sql explain plan I can see, that the predicate part_col = 'xyz' is also 
used in the DIm HiveTableScan.

 

When I execute the same query using a left join the full dim table is scanned. 
There are some workarounds for this issue, but I wanted to report this as a 
bug, since it works on an inner join, and i think the behaviour should be the 
same for an outer join.

Here is a self contained example (created in Zeppelin):

 
{code:java}
val fact = Seq((1, 100), (2, 200), (3,100), (4,200)).toDF("id", "part_col")
val dim = Seq((1, 100), (2, 200)).toDF("id", "part_col")
fact.repartition($"part_col").write.mode("overwrite").partitionBy("part_col").format("com.databricks.spark.avro").save("/tmp/jira/fact")
dim.repartition($"part_col").write.mode("overwrite").partitionBy("part_col").format("com.databricks.spark.avro").save("/tmp/jira/dim")
 
spark.sqlContext.sql("create table if not exists fact(id int) partitioned by 
(part_col int) stored as avro location '/tmp/jira/fact'")
spark.sqlContext.sql("msck repair table fact") spark.sqlContext.sql("create 
table if not exists dim(id int) partitioned by (part_col int) stored as avro 
location '/tmp/jira/dim'")
spark.sqlContext.sql("msck repair table dim"){code}
 
  
  
 *Inner join example:*
{code:java}
select * from fact f
join dim d
on (f.id = d.id
and f.part_col = d.part_col)
where f.part_col = 100{code}
Excerpt from Spark-SQL physical explain plan: 
{code:java}
HiveTableScan [id#411, part_col#412], CatalogRelation `default`.`fact`, 
org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#411], [part_col#412], 
[isnotnull(part_col#412), (part_col#412 = 100)] 
HiveTableScan [id#413, part_col#414], CatalogRelation `default`.`dim`, 
org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#413], [part_col#414], 
[isnotnull(part_col#414), (part_col#414 = 100)]{code}
 
 *Outer join example:*
{code:java}
select * from fact f
left join dim d
on (f.id = d.id
and f.part_col = d.part_col)
where f.part_col = 100{code}
 
 Excerpt from Spark-SQL physical explain plan:
  
{code:java}
HiveTableScan [id#426, part_col#427], CatalogRelation `default`.`fact`, 
org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#426], [part_col#427], 
[isnotnull(part_col#427), (part_col#427 = 100)]   
HiveTableScan [id#428, part_col#429], CatalogRelation `default`.`dim`, 
org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#428], [part_col#429] {code}
 
  

As you can see the predicate is not pushed down to the HiveTableScan of the dim 
table on the outer join.

 

  was:
I have two AVRO tables in Hive called FAct and DIm. Both are partitioned by a 
common column called part_col. Now I want to join both tables on their id but 
only for some of partitions.

If I use an inner join, everything works well:

 
{code:java}
select *
from FA f
join DI d
on(f.id = d.id and f.part_col = d.part_col)
where f.part_col = 'xyz'
{code}
 

In the sql explain plan I can see, that the predicate part_col = 'xyz' is also 
used in the DIm HiveTableScan.

 

When I execute the same query using a left join the full dim table is scanned. 
There are some workarounds for this issue, but I wanted to report this as a 
bug, since it works on an inner join, and i think the behaviour should be the 
same for an outer join.

Here is a self contained example (created in Zeppelin):

 
{code:java}
val fact = Seq((1, 100), (2, 200), (3,100), (4,200)).toDF("id", "part_col")
val dim = Seq((1, 100), (2, 200)).toDF("id", "part_col")
fact.repartition($"part_col").write.mode("overwrite").partitionBy("part_col").format("com.databricks.spark.avro").save("/tmp/jira/fact")
dim.repartition($"part_col").write.mode("overwrite").partitionBy("part_col").format("com.databricks.spark.avro").save("/tmp/jira/dim")
 
spark.sqlContext.sql("create table if not exists fact(id int) partitioned by 
(part_col int) stored as avro location '/tmp/jira/fact'")
spark.sqlContext.sql("msck repair table fact") spark.sqlContext.sql("create 
table if not exists dim(id int) partitioned by (part_col int) stored as avro 
location '/tmp/jira/dim'")
spark.sqlContext.sql("msck repair table dim"){code}
 
 
 
*Inner join example:*
{code:java}
select * from fact f
join dim d
on (f.id = d.id
and f.part_col = d.part_col)
where f.part_col = 100{code}
Excerpt from Spark-SQL physical explain plan: 
{code:java}
HiveTableScan [id#411, part_col#412], CatalogRelation `default`.`fact`, 
org.apache.hadoop.hive.serde2.avro.AvroSerDe, 

[jira] [Commented] (SPARK-24859) Predicates pushdown on outer joins

2018-07-20 Thread Johannes Mayer (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16550541#comment-16550541
 ] 

Johannes Mayer commented on SPARK-24859:


Ok, i added the example

> Predicates pushdown on outer joins
> --
>
> Key: SPARK-24859
> URL: https://issues.apache.org/jira/browse/SPARK-24859
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.2.0
> Environment: Cloudera CDH 5.13.1
>Reporter: Johannes Mayer
>Priority: Major
>
> I have two AVRO tables in Hive called FAct and DIm. Both are partitioned by a 
> common column called part_col. Now I want to join both tables on their id but 
> only for some of partitions.
> If I use an inner join, everything works well:
>  
> {code:java}
> select *
> from FA f
> join DI d
> on(f.id = d.id and f.part_col = d.part_col)
> where f.part_col = 'xyz'
> {code}
>  
> In the sql explain plan I can see, that the predicate part_col = 'xyz' is 
> also used in the DIm HiveTableScan.
>  
> When I execute the same query using a left join the full dim table is 
> scanned. There are some workarounds for this issue, but I wanted to report 
> this as a bug, since it works on an inner join, and i think the behaviour 
> should be the same for an outer join.
> Here is a self contained example (created in Zeppelin):
>  
> {code:java}
> val fact = Seq((1, 100), (2, 200), (3,100), (4,200)).toDF("id", "part_col")
> val dim = Seq((1, 100), (2, 200)).toDF("id", "part_col")
> fact.repartition($"part_col").write.mode("overwrite").partitionBy("part_col").format("com.databricks.spark.avro").save("/tmp/jira/fact")
> dim.repartition($"part_col").write.mode("overwrite").partitionBy("part_col").format("com.databricks.spark.avro").save("/tmp/jira/dim")
>  
> spark.sqlContext.sql("create table if not exists fact(id int) partitioned by 
> (part_col int) stored as avro location '/tmp/jira/fact'")
> spark.sqlContext.sql("msck repair table fact") spark.sqlContext.sql("create 
> table if not exists dim(id int) partitioned by (part_col int) stored as avro 
> location '/tmp/jira/dim'")
> spark.sqlContext.sql("msck repair table dim"){code}
>  
>  
>  
> *Inner join example:*
> {code:java}
> select * from fact f
> join dim d
> on (f.id = d.id
> and f.part_col = d.part_col)
> where f.part_col = 100{code}
> Excerpt from Spark-SQL physical explain plan: 
> {code:java}
> HiveTableScan [id#411, part_col#412], CatalogRelation `default`.`fact`, 
> org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#411], [part_col#412], 
> [isnotnull(part_col#412), (part_col#412 = 100)] 
> HiveTableScan [id#413, part_col#414], CatalogRelation `default`.`dim`, 
> org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#413], [part_col#414], 
> [isnotnull(part_col#414), (part_col#414 = 100)]{code}
>  
> *Outer join example:*
> {code:java}
> select * from fact f
> left join dim d
> on (f.id = d.id
> and f.part_col = d.part_col)
> where f.part_col = 100{code}
>  
> Excerpt from Spark-SQL physical explain plan:
>  
> {code:java}
> HiveTableScan [id#426, part_col#427], CatalogRelation `default`.`fact`, 
> org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#426], [part_col#427], 
> [isnotnull(part_col#427), (part_col#427 = 100)]   
> HiveTableScan [id#428, part_col#429], CatalogRelation `default`.`dim`, 
> org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#428], [part_col#429] {code}
>  
>  
> As you can see the predicate is not pushed down to the HiveTableScan of the 
> dim table on the outer join.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24859) Predicates pushdown on outer joins

2018-07-20 Thread Johannes Mayer (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johannes Mayer updated SPARK-24859:
---
Description: 
I have two AVRO tables in Hive called FAct and DIm. Both are partitioned by a 
common column called part_col. Now I want to join both tables on their id but 
only for some of partitions.

If I use an inner join, everything works well:

 
{code:java}
select *
from FA f
join DI d
on(f.id = d.id and f.part_col = d.part_col)
where f.part_col = 'xyz'
{code}
 

In the sql explain plan I can see, that the predicate part_col = 'xyz' is also 
used in the DIm HiveTableScan.

 

When I execute the same query using a left join the full dim table is scanned. 
There are some workarounds for this issue, but I wanted to report this as a 
bug, since it works on an inner join, and i think the behaviour should be the 
same for an outer join.

Here is a self contained example (created in Zeppelin):

 
{code:java}
val fact = Seq((1, 100), (2, 200), (3,100), (4,200)).toDF("id", "part_col")
val dim = Seq((1, 100), (2, 200)).toDF("id", "part_col")
fact.repartition($"part_col").write.mode("overwrite").partitionBy("part_col").format("com.databricks.spark.avro").save("/tmp/jira/fact")
dim.repartition($"part_col").write.mode("overwrite").partitionBy("part_col").format("com.databricks.spark.avro").save("/tmp/jira/dim")
 
spark.sqlContext.sql("create table if not exists fact(id int) partitioned by 
(part_col int) stored as avro location '/tmp/jira/fact'")
spark.sqlContext.sql("msck repair table fact") spark.sqlContext.sql("create 
table if not exists dim(id int) partitioned by (part_col int) stored as avro 
location '/tmp/jira/dim'")
spark.sqlContext.sql("msck repair table dim"){code}
 
 
 
*Inner join example:*
{code:java}
select * from fact f
join dim d
on (f.id = d.id
and f.part_col = d.part_col)
where f.part_col = 100{code}
Excerpt from Spark-SQL physical explain plan: 
{code:java}
HiveTableScan [id#411, part_col#412], CatalogRelation `default`.`fact`, 
org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#411], [part_col#412], 
[isnotnull(part_col#412), (part_col#412 = 100)] 
HiveTableScan [id#413, part_col#414], CatalogRelation `default`.`dim`, 
org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#413], [part_col#414], 
[isnotnull(part_col#414), (part_col#414 = 100)]{code}
 
*Outer join example:*
{code:java}
select * from fact f
left join dim d
on (f.id = d.id
and f.part_col = d.part_col)
where f.part_col = 100{code}
 
Excerpt from Spark-SQL physical explain plan:
 
{code:java}
HiveTableScan [id#426, part_col#427], CatalogRelation `default`.`fact`, 
org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#426], [part_col#427], 
[isnotnull(part_col#427), (part_col#427 = 100)]   
HiveTableScan [id#428, part_col#429], CatalogRelation `default`.`dim`, 
org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#428], [part_col#429] {code}
 
 

As you can see the predicate is not pushed down to the HiveTableScan of the dim 
table on the outer join.

 

  was:
I have two AVRO tables in Hive called FAct and DIm. Both are partitioned by a 
common column called part_col. Now I want to join both tables on their id but 
only for some of partitions.

If I use an inner join, everything works well:

 
{code:java}
select *
from FA f
join DI d
on(f.id = d.id and f.part_col = d.part_col)
where f.part_col = 'xyz'
{code}
 

In the sql explain plan I can see, that the predicate part_col = 'xyz' is also 
used in the DIm HiveTableScan.

 

When I execute the same query using a left join the full dim table is scanned. 
There are some workarounds for this issue, but I wanted to report this as a 
bug, since it works on an inner join, and i think the behaviour should be the 
same for an outer join

 

 


> Predicates pushdown on outer joins
> --
>
> Key: SPARK-24859
> URL: https://issues.apache.org/jira/browse/SPARK-24859
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.2.0
> Environment: Cloudera CDH 5.13.1
>Reporter: Johannes Mayer
>Priority: Major
>
> I have two AVRO tables in Hive called FAct and DIm. Both are partitioned by a 
> common column called part_col. Now I want to join both tables on their id but 
> only for some of partitions.
> If I use an inner join, everything works well:
>  
> {code:java}
> select *
> from FA f
> join DI d
> on(f.id = d.id and f.part_col = d.part_col)
> where f.part_col = 'xyz'
> {code}
>  
> In the sql explain plan I can see, that the predicate part_col = 'xyz' is 
> also used in the DIm HiveTableScan.
>  
> When I execute the same query using a left join the full dim table is 
> scanned. There are some workarounds for this issue, but I wanted to report 
> this as a bug, since it works on an inner join, and i think the behaviour 
> should be the same for an 

[jira] [Commented] (SPARK-24859) Predicates pushdown on outer joins

2018-07-20 Thread Johannes Mayer (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16550359#comment-16550359
 ] 

Johannes Mayer commented on SPARK-24859:


I will provide an example. Could you test it on the master branch?

> Predicates pushdown on outer joins
> --
>
> Key: SPARK-24859
> URL: https://issues.apache.org/jira/browse/SPARK-24859
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.2.0
> Environment: Cloudera CDH 5.13.1
>Reporter: Johannes Mayer
>Priority: Major
>
> I have two AVRO tables in Hive called FAct and DIm. Both are partitioned by a 
> common column called part_col. Now I want to join both tables on their id but 
> only for some of partitions.
> If I use an inner join, everything works well:
>  
> {code:java}
> select *
> from FA f
> join DI d
> on(f.id = d.id and f.part_col = d.part_col)
> where f.part_col = 'xyz'
> {code}
>  
> In the sql explain plan I can see, that the predicate part_col = 'xyz' is 
> also used in the DIm HiveTableScan.
>  
> When I execute the same query using a left join the full dim table is 
> scanned. There are some workarounds for this issue, but I wanted to report 
> this as a bug, since it works on an inner join, and i think the behaviour 
> should be the same for an outer join
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24859) Predicates pushdown on outer joins

2018-07-19 Thread Johannes Mayer (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johannes Mayer updated SPARK-24859:
---
Description: 
I have two AVRO tables in Hive called FAct and DIm. Both are partitioned by a 
common column called part_col. Now I want to join both tables on their id but 
only for some of partitions.

If I use an inner join, everything works well:

 
{code:java}
select *
from FA f
join DI d
on(f.id = d.id and f.part_col = d.part_col)
where f.part_col = 'xyz'
{code}
 

In the sql explain plan I can see, that the predicate part_col = 'xyz' is also 
used in the DIm HiveTableScan.

 

When I execute the same query using a left join the full dim table is scanned. 
There are some workarounds for this issue, but I wanted to report this as a 
bug, since it works on an inner join, and i think the behaviour should be the 
same for an outer join

 

 

  was:
I have two AVRO tables in Hive called FAct and DIm. Both are partitioned by a 
common column called part_col. Now I want to join both tables on their id but 
only for some of partitions.

If I use an inner join, everything works well:

 
{code:java}
select *
from FA f
join DI d
on(f.id = d.id and f.part_col = d.part_col)
where f.part_col = 'xyz'
{code}
 

In the sql explain plan i can see, that the predicate part_col = 'xyz' is also 
used in the DIm HiveTableScan.

 

When I execute the same query using a left join the full dim table is scanned. 
There are some workarounds for this issue, but i wanted to report this as a 
bug, since it works on an inner join, and i think the behaviour should be the 
same for an outer join

 

 


> Predicates pushdown on outer joins
> --
>
> Key: SPARK-24859
> URL: https://issues.apache.org/jira/browse/SPARK-24859
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.2.0
> Environment: Cloudera CDH 5.13.1
>Reporter: Johannes Mayer
>Priority: Critical
>
> I have two AVRO tables in Hive called FAct and DIm. Both are partitioned by a 
> common column called part_col. Now I want to join both tables on their id but 
> only for some of partitions.
> If I use an inner join, everything works well:
>  
> {code:java}
> select *
> from FA f
> join DI d
> on(f.id = d.id and f.part_col = d.part_col)
> where f.part_col = 'xyz'
> {code}
>  
> In the sql explain plan I can see, that the predicate part_col = 'xyz' is 
> also used in the DIm HiveTableScan.
>  
> When I execute the same query using a left join the full dim table is 
> scanned. There are some workarounds for this issue, but I wanted to report 
> this as a bug, since it works on an inner join, and i think the behaviour 
> should be the same for an outer join
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24859) Predicates pushdown on outer joins

2018-07-19 Thread Johannes Mayer (JIRA)
Johannes Mayer created SPARK-24859:
--

 Summary: Predicates pushdown on outer joins
 Key: SPARK-24859
 URL: https://issues.apache.org/jira/browse/SPARK-24859
 Project: Spark
  Issue Type: Bug
  Components: Spark Core, SQL
Affects Versions: 2.2.0
 Environment: Cloudera CDH 5.13.1
Reporter: Johannes Mayer


I have two AVRO tables in Hive called FAct and DIm. Both are partitioned by a 
common column called part_col. Now I want to join both tables on their id but 
only for some of partitions.

If I use an inner join, everything works well:

 
{code:java}
select *
from FA f
join DI d
on(f.id = d.id and f.part_col = d.part_col)
where f.part_col = 'xyz'
{code}
 

In the sql explain plan i can see, that the predicate part_col = 'xyz' is also 
used in the DIm HiveTableScan.

 

When I execute the same query using a left join the full dim table is scanned. 
There are some workarounds for this issue, but i wanted to report this as a 
bug, since it works on an inner join, and i think the behaviour should be the 
same for an outer join

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23771) Uneven Rowgroup size after repartition

2018-05-14 Thread Johannes Mayer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johannes Mayer updated SPARK-23771:
---
Affects Version/s: 2.2.0

> Uneven Rowgroup size after repartition
> --
>
> Key: SPARK-23771
> URL: https://issues.apache.org/jira/browse/SPARK-23771
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output, Shuffle, SQL
>Affects Versions: 1.6.0, 2.2.0
> Environment: Cloudera CDH 5.13.1
>Reporter: Johannes Mayer
>Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I have a Hive table on AVRO files, that i want to read and store as a 
> partitioned parquet files (one file per partition).
> What i do is:
> {code:java}
> // read the AVRO table and distribute by the partition column
> val data = sql("select * from avro_table distribute by part_col")
>  
> // write data as partitioned parquet files
> data.write.partitionBy(part_col).parquet("output/path/")
> {code}
>  
> I get one file per partition as expected. But often I run into OutOfMemory 
> Errors. Investigating the issue I found out, that some row groups are very 
> big and since all data of a row group is held in memory before it is flushed 
> to disk, i think this causes the OutOfMemory. Other row groups are very 
> small, containing almost no data. See the output from parquet-tools meta:
>  
> {code:java}
> row group 1: RC:5740100 TS:566954562 OFFSET:4 
> row group 2: RC:33769 TS:2904145 OFFSET:117971092 
> row group 3: RC:31822 TS:2772650 OFFSET:118905225 
> row group 4: RC:29854 TS:2704127 OFFSET:119793188 
> row group 5: RC:28050 TS:2356729 OFFSET:120660675 
> row group 6: RC:26507 TS:2111983 OFFSET:121406541 
> row group 7: RC:25143 TS:1967731 OFFSET:122069351 
> row group 8: RC:23876 TS:1991238 OFFSET:122682160 
> row group 9: RC:22584 TS:2069463 OFFSET:123303246 
> row group 10: RC:21225 TS:1955748 OFFSET:123960700 
> row group 11: RC:19960 TS:1931889 OFFSET:124575333 
> row group 12: RC:18806 TS:1725871 OFFSET:125132862 
> row group 13: RC:17719 TS:1653309 OFFSET:125668057 
> row group 14: RC:1617743 TS:157973949 OFFSET:134217728{code}
>  
> One thing to notice is, that this file was written in a Spark application 
> running on 13 executors. Is it possible, that local data is in the big row 
> group and the remote reads go into seperate (small) row groups? The shuffle 
> is involved, because data is read with distribute by clause.
>  
> Is this a known bug? Is there a workaround to get even row group sizes? I 
> want to decrease the row group size using 
> sc.hadoopConfiguration.setInt("parquet.block.size", 64 * 1024 * 1024)
>  
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23771) Uneven Rowgroup size after repartition

2018-05-14 Thread Johannes Mayer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16474326#comment-16474326
 ] 

Johannes Mayer commented on SPARK-23771:


I have tested it in Spark 2.2.0 and the issue still existed

> Uneven Rowgroup size after repartition
> --
>
> Key: SPARK-23771
> URL: https://issues.apache.org/jira/browse/SPARK-23771
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output, Shuffle, SQL
>Affects Versions: 1.6.0
> Environment: Cloudera CDH 5.13.1
>Reporter: Johannes Mayer
>Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I have a Hive table on AVRO files, that i want to read and store as a 
> partitioned parquet files (one file per partition).
> What i do is:
> {code:java}
> // read the AVRO table and distribute by the partition column
> val data = sql("select * from avro_table distribute by part_col")
>  
> // write data as partitioned parquet files
> data.write.partitionBy(part_col).parquet("output/path/")
> {code}
>  
> I get one file per partition as expected. But often I run into OutOfMemory 
> Errors. Investigating the issue I found out, that some row groups are very 
> big and since all data of a row group is held in memory before it is flushed 
> to disk, i think this causes the OutOfMemory. Other row groups are very 
> small, containing almost no data. See the output from parquet-tools meta:
>  
> {code:java}
> row group 1: RC:5740100 TS:566954562 OFFSET:4 
> row group 2: RC:33769 TS:2904145 OFFSET:117971092 
> row group 3: RC:31822 TS:2772650 OFFSET:118905225 
> row group 4: RC:29854 TS:2704127 OFFSET:119793188 
> row group 5: RC:28050 TS:2356729 OFFSET:120660675 
> row group 6: RC:26507 TS:2111983 OFFSET:121406541 
> row group 7: RC:25143 TS:1967731 OFFSET:122069351 
> row group 8: RC:23876 TS:1991238 OFFSET:122682160 
> row group 9: RC:22584 TS:2069463 OFFSET:123303246 
> row group 10: RC:21225 TS:1955748 OFFSET:123960700 
> row group 11: RC:19960 TS:1931889 OFFSET:124575333 
> row group 12: RC:18806 TS:1725871 OFFSET:125132862 
> row group 13: RC:17719 TS:1653309 OFFSET:125668057 
> row group 14: RC:1617743 TS:157973949 OFFSET:134217728{code}
>  
> One thing to notice is, that this file was written in a Spark application 
> running on 13 executors. Is it possible, that local data is in the big row 
> group and the remote reads go into seperate (small) row groups? The shuffle 
> is involved, because data is read with distribute by clause.
>  
> Is this a known bug? Is there a workaround to get even row group sizes? I 
> want to decrease the row group size using 
> sc.hadoopConfiguration.setInt("parquet.block.size", 64 * 1024 * 1024)
>  
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23771) Uneven Rowgroup size after repartition

2018-03-22 Thread Johannes Mayer (JIRA)
Johannes Mayer created SPARK-23771:
--

 Summary: Uneven Rowgroup size after repartition
 Key: SPARK-23771
 URL: https://issues.apache.org/jira/browse/SPARK-23771
 Project: Spark
  Issue Type: Bug
  Components: Input/Output, Shuffle, SQL
Affects Versions: 1.6.0
 Environment: Cloudera CDH 5.13.1
Reporter: Johannes Mayer


I have a Hive table on AVRO files, that i want to read and store as a 
partitioned parquet files (one file per partition).

What i do is:
{code:java}
// read the AVRO table and distribute by the partition column

val data = sql("select * from avro_table distribute by part_col")
 
// write data as partitioned parquet files

data.write.partitionBy(part_col).parquet("output/path/")
{code}
 

I get one file per partition as expected. But often I run into OutOfMemory 
Errors. Investigating the issue I found out, that some row groups are very big 
and since all data of a row group is held in memory before it is flushed to 
disk, i think this causes the OutOfMemory. Other row groups are very small, 
containing almost no data. See the output from parquet-tools meta:

 
{code:java}
row group 1: RC:5740100 TS:566954562 OFFSET:4 
row group 2: RC:33769 TS:2904145 OFFSET:117971092 
row group 3: RC:31822 TS:2772650 OFFSET:118905225 
row group 4: RC:29854 TS:2704127 OFFSET:119793188 
row group 5: RC:28050 TS:2356729 OFFSET:120660675 
row group 6: RC:26507 TS:2111983 OFFSET:121406541 
row group 7: RC:25143 TS:1967731 OFFSET:122069351 
row group 8: RC:23876 TS:1991238 OFFSET:122682160 
row group 9: RC:22584 TS:2069463 OFFSET:123303246 
row group 10: RC:21225 TS:1955748 OFFSET:123960700 
row group 11: RC:19960 TS:1931889 OFFSET:124575333 
row group 12: RC:18806 TS:1725871 OFFSET:125132862 
row group 13: RC:17719 TS:1653309 OFFSET:125668057 
row group 14: RC:1617743 TS:157973949 OFFSET:134217728{code}
 

One thing to notice is, that this file was written in a Spark application 
running on 13 executors. Is it possible, that local data is in the big row 
group and the remote reads go into seperate (small) row groups? The shuffle is 
involved, because data is read with distribute by clause.

 

Is this a known bug? Is there a workaround to get even row group sizes? I want 
to decrease the row group size using 
sc.hadoopConfiguration.setInt("parquet.block.size", 64 * 1024 * 1024)

 

 

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23201) Cannot create view when duplicate columns exist in subquery

2018-01-24 Thread Johannes Mayer (JIRA)
Johannes Mayer created SPARK-23201:
--

 Summary: Cannot create view when duplicate columns exist in 
subquery
 Key: SPARK-23201
 URL: https://issues.apache.org/jira/browse/SPARK-23201
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.6.0
Reporter: Johannes Mayer


I have two tables A(colA, col2, col3), B(colB, col3, col5)

If i join them in a subquery on A.colA = B.colB i can select the non duplicate 
columns, but i cannot create a view (col3 is duplicate, but not selected)

 
{code:java}
create view testview as select
tmp.colA, tmp.col2, tmp.colB, tmp.col5
from (
select * from A left join B
on (A.colA = B.colB)
)
{code}
 

 

This works:

 
{code:java}
select
tmp.colA, tmp.col2, tmp.colB, tmp.col5
from (
select * from A left join B
on (A.colA = B.colB)
)
{code}
 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org