[ https://issues.apache.org/jira/browse/SPARK-25164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16611331#comment-16611331 ]
Bruce Robbins commented on SPARK-25164: --------------------------------------- Thanks [~Tagar] for the feedback. I assume the 44% improvement was for a table with lots of file splits. I have thoughts on this issue, but fetching 70k rows from a wide table that has itself only 70k rows should be somewhat fast. With a table like that, I can fetch 70k rows on my laptop in under a minute. Fetching 70k rows from a table that has, say, 10m million rows, can be pretty poky, even with this fix. I have theories (or partially informed speculation) about why this is. Here is the gist: Let's say * your projection includes every column of a wide table (i.e., {{select *}}) * you are filtering away most rows from a large table (e.g., {{select * from table where id1 = 1}}, which would fetch, say, only 0.2% of the rows) * matching rows are sprinkled fairly evenly throughout the table In this case, Spark ends up reading (potentially) every data page from the parquet file, and realizing each wide row in memory, just to pass the row to Spark's filter operator so it can be (most likely) discarded. This is true even when Spark pushes down the filter to the parquet reader. This is because the matching rows are sprinkled evenly throughout the table, so (potentially) every data page for column id1 has at least one entry where value = 1. When a page has even a single matching entry, Spark realizes all the rows associated with that page. Realizing very wide rows in memory seems to be somewhat expensive, according to my profiling. I am not sure yet what part of realizing the rows in memory is expensive. If the matching rows tend to be clumped together in one part of the table (say, the table is sorted on id1), most data pages will not contain matching rows. Spark can skip reading most data pages, and therefore avoid realizing most rows in memory. In that case, the query will be much faster: In a test I just ran, my query on the sorted table was 3 times faster. The filter push down code seems to be intended for cases like this. {{select * from table limit nnnn}} is also slow, although not as slow as filtering the same number of records, but I have not looked into why. > Parquet reader builds entire list of columns once for each column > ----------------------------------------------------------------- > > Key: SPARK-25164 > URL: https://issues.apache.org/jira/browse/SPARK-25164 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.4.0 > Reporter: Bruce Robbins > Assignee: Bruce Robbins > Priority: Minor > Fix For: 2.2.3, 2.3.2, 2.4.0 > > > {{VectorizedParquetRecordReader.initializeInternal}} loops through each > column, and for each column it calls > {noformat} > requestedSchema.getColumns().get(i) > {noformat} > However, {{MessageType.getColumns}} will build the entire column list from > getPaths(0). > {noformat} > public List<ColumnDescriptor> getColumns() { > List<String[]> paths = this.getPaths(0); > List<ColumnDescriptor> columns = new > ArrayList<ColumnDescriptor>(paths.size()); > for (String[] path : paths) { > // TODO: optimize this > > PrimitiveType primitiveType = getType(path).asPrimitiveType(); > columns.add(new ColumnDescriptor( > path, > primitiveType, > getMaxRepetitionLevel(path), > getMaxDefinitionLevel(path))); > } > return columns; > } > {noformat} > This means that for each parquet file, this routine indirectly iterates > colCount*colCount times. > This is actually not particularly noticeable unless you have: > - many parquet files > - many columns > To verify that this is an issue, I created a 1 million record parquet table > with 6000 columns of type double and 67 files (so initializeInternal is > called 67 times). I ran the following query: > {noformat} > sql("select * from 6000_1m_double where id1 = 1").collect > {noformat} > I used Spark from the master branch. I had 8 executor threads. The filter > returns only a few thousand records. The query ran (on average) for 6.4 > minutes. > Then I cached the column list at the top of {{initializeInternal}} as follows: > {noformat} > List<ColumnDescriptor> columnCache = requestedSchema.getColumns(); > {noformat} > Then I changed {{initializeInternal}} to use {{columnCache}} rather than > {{requestedSchema.getColumns()}}. > With the column cache variable, the same query runs in 5 minutes. So with my > simple query, you save %22 of time by not rebuilding the column list for each > column. > You get additional savings with a paths cache variable, now saving 34% in > total on the above query. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org