Hey Hyukjin,

Sorry that I missed the JIRA ticket. Thanks for bring this issue up here, your detailed investigation.

From my side, I think this is a bug of Parquet. Parquet was designed to support schema evolution. When scanning a Parquet, if a column exists in the requested schema but missing in the file schema, that column is filled with null. This should also hold for pushed-down predicate filters. For example, if filter "a = 1" is pushed down but column "a" doesn't exist in the Parquet file being scanned, it's safe to assume "a" is null in all records and drop all of them. On the contrary, if "a IS NULL" is pushed down, all records should be preserved.

Apparently, before this issue is properly fixed on Parquet side, we need to workaround this issue from Spark side. Please see my comments of all 3 of your solutions inlined below. In short, I'd like to have approach 1 for branch-1.5 and approach 2 for master.

Cheng

On 10/28/15 10:11 AM, Hyukjin Kwon wrote:
When enabling mergedSchema and predicate filter, this fails since Parquet filters are pushed down regardless of each schema of the splits (or rather files).

Dominic Ricard reported this issue (https://issues.apache.org/jira/browse/SPARK-11103)

Even though this would work okay by setting spark.sql.parquet.filterPushdown to false, the default value of this is true. So this looks an issue.

My questions are,
is this clearly an issue?
and if so, which way would this be handled?


I thought this is an issue and I made three rough patches for this and tested them and this looks fine though.

The first approach looks simpler and appropriate as I presume from the previous approaches such as https://issues.apache.org/jira/browse/SPARK-11153 However, in terms of safety and performances, I also want to ensure which one would be a proper approach before trying to open a PR.

1. Simply set false to spark.sql.parquet.filterPushdown when using mergeSchema
This one is pretty simple and safe, I'd like to have this for 1.5.2, or 1.5.3 if we can't make it for 1.5.2.

2. If spark.sql.parquet.filterPushdown is true, retrieve all the schema of every part-files (and also merged one) and check if each can accept the given schema and then, apply the filter only when they all can accept, which I think it's a bit over-implemented.
Actually we only need to calculate the intersection of all file schemata. We can make ParquetRelation.mergeSchemaInParallel return two StructTypes, the first one is the original merged schema, the other is the intersection of all file schemata, which only contains fields that exist in all file schemata. Then we decide which filter to pushed down according to the second StructType.

3. If spark.sql.parquet.filterPushdown is true, retrieve all the schema of every part-files (and also merged one) and apply the filter to each split (rather file) that can accept the filter which (I think it's hacky) ends up different configurations for each task in a job.
The idea I came up with at first was similar to this one. Instead of pulling all file schemata to driver side, we can push filter push-down to executor side. Namely, passing candidate filters to executor side, and compute the Parquet predicate filter according to each file schema. I haven't looked into this direction in depth, but we can probably put this part into CatalystReadSupport, which is now initialized on executor side.

However, correctness of this approach can only guaranteed by the defensive filtering we do in Spark SQL (i.e. apply all the filters no matter they are pushed down or not), but we are considering to remove it because it imposes unnecessary performance cost. This makes me hesitant to go along this way.

Reply via email to