[ 
https://issues.apache.org/jira/browse/SPARK-25164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16611331#comment-16611331
 ] 

Bruce Robbins commented on SPARK-25164:
---------------------------------------

Thanks [~Tagar] for the feedback. I assume the 44% improvement was for a table 
with lots of file splits.

I have thoughts on this issue, but fetching 70k rows from a wide table that has 
itself only 70k rows should be somewhat fast. With a table like that, I can 
fetch 70k rows on my laptop in under a minute.

Fetching 70k rows from a table that has, say, 10m million rows, can be pretty 
poky, even with this fix.

I have theories (or partially informed speculation) about why this is. Here is 
the gist:

Let's say
 * your projection includes every column of a wide table (i.e., {{select *}})
 * you are filtering away most rows from a large table (e.g., {{select * from 
table where id1 = 1}}, which would fetch, say, only 0.2% of the rows)
 * matching rows are sprinkled fairly evenly throughout the table

In this case, Spark ends up reading (potentially) every data page from the 
parquet file, and realizing each wide row in memory, just to pass the row to 
Spark's filter operator so it can be (most likely) discarded.

This is true even when Spark pushes down the filter to the parquet reader.

This is because the matching rows are sprinkled evenly throughout the table, so 
(potentially) every data page for column id1 has at least one entry where value 
= 1. When a page has even a single matching entry, Spark realizes all the rows 
associated with that page.

Realizing very wide rows in memory seems to be somewhat expensive, according to 
my profiling. I am not sure yet what part of realizing the rows in memory is 
expensive.

If the matching rows tend to be clumped together in one part of the table (say, 
the table is sorted on id1), most data pages will not contain matching rows. 
Spark can skip reading most data pages, and therefore avoid realizing most rows 
in memory. In that case, the query will be much faster: In a test I just ran, 
my query on the sorted table was 3 times faster. The filter push down code 
seems to be intended for cases like this.

{{select * from table limit nnnn}} is also slow, although not as slow as 
filtering the same number of records, but I have not looked into why.

> Parquet reader builds entire list of columns once for each column
> -----------------------------------------------------------------
>
>                 Key: SPARK-25164
>                 URL: https://issues.apache.org/jira/browse/SPARK-25164
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: Bruce Robbins
>            Assignee: Bruce Robbins
>            Priority: Minor
>             Fix For: 2.2.3, 2.3.2, 2.4.0
>
>
> {{VectorizedParquetRecordReader.initializeInternal}} loops through each 
> column, and for each column it calls
> {noformat}
> requestedSchema.getColumns().get(i)
> {noformat}
> However, {{MessageType.getColumns}} will build the entire column list from 
> getPaths(0).
> {noformat}
>   public List<ColumnDescriptor> getColumns() {
>     List<String[]> paths = this.getPaths(0);
>     List<ColumnDescriptor> columns = new 
> ArrayList<ColumnDescriptor>(paths.size());
>     for (String[] path : paths) {
>       // TODO: optimize this                                                  
>                                                                   
>       PrimitiveType primitiveType = getType(path).asPrimitiveType();
>       columns.add(new ColumnDescriptor(
>                       path,
>                       primitiveType,
>                       getMaxRepetitionLevel(path),
>                       getMaxDefinitionLevel(path)));
>     }
>     return columns;
>   }
> {noformat}
> This means that for each parquet file, this routine indirectly iterates 
> colCount*colCount times.
> This is actually not particularly noticeable unless you have:
>  - many parquet files
>  - many columns
> To verify that this is an issue, I created a 1 million record parquet table 
> with 6000 columns of type double and 67 files (so initializeInternal is 
> called 67 times). I ran the following query:
> {noformat}
> sql("select * from 6000_1m_double where id1 = 1").collect
> {noformat}
> I used Spark from the master branch. I had 8 executor threads. The filter 
> returns only a few thousand records. The query ran (on average) for 6.4 
> minutes.
> Then I cached the column list at the top of {{initializeInternal}} as follows:
> {noformat}
> List<ColumnDescriptor> columnCache = requestedSchema.getColumns();
> {noformat}
> Then I changed {{initializeInternal}} to use {{columnCache}} rather than 
> {{requestedSchema.getColumns()}}.
> With the column cache variable, the same query runs in 5 minutes. So with my 
> simple query, you save %22 of time by not rebuilding the column list for each 
> column.
> You get additional savings with a paths cache variable, now saving 34% in 
> total on the above query.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to