[ 
https://issues.apache.org/jira/browse/SPARK-11103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14977961#comment-14977961
 ] 

Cheng Lian edited comment on SPARK-11103 at 10/28/15 8:32 AM:
--------------------------------------------------------------

Quoted from my reply on the user list:

For 1: This one is pretty simple and safe, I'd like to have this for 1.5.2, or 
1.5.3 if we can't make it for 1.5.2.

For 2: I'd like to have this for Spark master. Actually we only need to 
calculate the intersection of all file schemata. We can make 
ParquetRelation.mergeSchemaInParallel return two StructTypes, the first one is 
the original merged schema, the other is the intersection of all file schemata, 
which only contains fields that exist in all file schemata. Then we decide 
which filter to pushed down according to the second StructType.

For 3:  The idea with which I came up at first was similar to this one. Instead 
of pulling all file schemata to driver side, we can push filter push-down code 
to executor side. Namely, passing candidate filters to executor side, and 
compute the Parquet filter predicates according to individual file schema. I 
haven't looked into this direction in depth, but we can probably put this part 
into CatalystReadSupport, which is now initialized on executor side.   However, 
correctness of this approach can only be guaranteed by the defensive filtering 
we do in Spark SQL (i.e. apply all the filters no matter they are pushed down 
or not), but we are considering to remove it because it imposes unnecessary 
performance cost. This makes me hesitant to go along this way.

>From my side, I think this is a bug of Parquet. Parquet was designed to 
>support schema evolution. When scanning a Parquet file, if a column exists in 
>the requested schema but is missing in the file schema, that column is filled 
>with null. This should also hold for pushed-down filter predicates. For 
>example, if filter "a = 1" is pushed down but column "a" doesn't exist in the 
>Parquet file being scanned, it's safe to assume "a" is null in all records and 
>drop all of them. On the contrary, if "a IS NULL" is pushed down, all records 
>should be preserved.

Filed PARQUET-389 to track this issue.


was (Author: lian cheng):
Quoted from my reply on the user list:

For 1: This one is pretty simple and safe, I'd like to have this for 1.5.2, or 
1.5.3 if we can't make it for 1.5.2.

For 2: Actually we only need to calculate the intersection of all file 
schemata. We can make ParquetRelation.mergeSchemaInParallel return two 
StructTypes, the first one is the original merged schema, the other is the 
intersection of all file schemata, which only contains fields that exist in all 
file schemata. Then we decide which filter to pushed down according to the 
second StructType.

For 3:  The idea with which I came up at first was similar to this one. Instead 
of pulling all file schemata to driver side, we can push filter push-down code 
to executor side. Namely, passing candidate filters to executor side, and 
compute the Parquet filter predicates according to individual file schema. I 
haven't looked into this direction in depth, but we can probably put this part 
into CatalystReadSupport, which is now initialized on executor side.   However, 
correctness of this approach can only be guaranteed by the defensive filtering 
we do in Spark SQL (i.e. apply all the filters no matter they are pushed down 
or not), but we are considering to remove it because it imposes unnecessary 
performance cost. This makes me hesitant to go along this way.

>From my side, I think this is a bug of Parquet. Parquet was designed to 
>support schema evolution. When scanning a Parquet file, if a column exists in 
>the requested schema but is missing in the file schema, that column is filled 
>with null. This should also hold for pushed-down filter predicates. For 
>example, if filter "a = 1" is pushed down but column "a" doesn't exist in the 
>Parquet file being scanned, it's safe to assume "a" is null in all records and 
>drop all of them. On the contrary, if "a IS NULL" is pushed down, all records 
>should be preserved.

Filed PARQUET-389 to track this issue.

> Filter applied on Merged Parquet shema with new column fail with 
> (java.lang.IllegalArgumentException: Column [column_name] was not found in 
> schema!)
> ----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-11103
>                 URL: https://issues.apache.org/jira/browse/SPARK-11103
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.1
>            Reporter: Dominic Ricard
>            Assignee: Hyukjin Kwon
>
> When evolving a schema in parquet files, spark properly expose all columns 
> found in the different parquet files but when trying to query the data, it is 
> not possible to apply a filter on a column that is not present in all files.
> To reproduce:
> *SQL:*
> {noformat}
> create table `table1` STORED AS PARQUET LOCATION 
> 'hdfs://<SERVER>:<PORT>/path/to/table/id=1/' as select 1 as `col1`;
> create table `table2` STORED AS PARQUET LOCATION 
> 'hdfs://<SERVER>:<PORT>/path/to/table/id=2/' as select 1 as `col1`, 2 as 
> `col2`;
> create table `table3` USING org.apache.spark.sql.parquet OPTIONS (path 
> "hdfs://<SERVER>:<PORT>/path/to/table");
> select col1 from `table3` where col2 = 2;
> {noformat}
> The last select will output the following Stack Trace:
> {noformat}
> An error occurred when executing the SQL command:
> select col1 from `table3` where col2 = 2
> [Simba][HiveJDBCDriver](500051) ERROR processing query/statement. Error Code: 
> 0, SQL state: TStatus(statusCode:ERROR_STATUS, 
> infoMessages:[*org.apache.hive.service.cli.HiveSQLException:org.apache.spark.SparkException:
>  Job aborted due to stage failure: Task 0 in stage 7212.0 failed 4 times, 
> most recent failure: Lost task 0.3 in stage 7212.0 (TID 138449, 
> 208.92.52.88): java.lang.IllegalArgumentException: Column [col2] was not 
> found in schema!
>       at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:160)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:94)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59)
>       at 
> org.apache.parquet.filter2.predicate.Operators$Eq.accept(Operators.java:180)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64)
>       at 
> org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59)
>       at 
> org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40)
>       at 
> org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126)
>       at 
> org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46)
>       at 
> org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:160)
>       at 
> org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
>       at 
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.<init>(SqlNewHadoopRDD.scala:155)
>       at 
> org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:120)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>       at org.apache.spark.scheduler.Task.run(Task.scala:88)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace::26:25, 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation:runInternal:SparkExecuteStatementOperation.scala:259,
>  
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation:run:SparkExecuteStatementOperation.scala:144,
>  
> org.apache.hive.service.cli.session.HiveSessionImpl:executeStatementInternal:HiveSessionImpl.java:388,
>  
> org.apache.hive.service.cli.session.HiveSessionImpl:executeStatement:HiveSessionImpl.java:369,
>  sun.reflect.GeneratedMethodAccessor134:invoke::-1, 
> sun.reflect.DelegatingMethodAccessorImpl:invoke:DelegatingMethodAccessorImpl.java:43,
>  java.lang.reflect.Method:invoke:Method.java:497, 
> org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessionProxy.java:78,
>  
> org.apache.hive.service.cli.session.HiveSessionProxy:access$000:HiveSessionProxy.java:36,
>  
> org.apache.hive.service.cli.session.HiveSessionProxy$1:run:HiveSessionProxy.java:63,
>  java.security.AccessController:doPrivileged:AccessController.java:-2, 
> javax.security.auth.Subject:doAs:Subject.java:422, 
> org.apache.hadoop.security.UserGroupInformation:doAs:UserGroupInformation.java:1628,
>  
> org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessionProxy.java:59,
>  com.sun.proxy.$Proxy25:executeStatement::-1, 
> org.apache.hive.service.cli.CLIService:executeStatement:CLIService.java:261, 
> org.apache.hive.service.cli.thrift.ThriftCLIService:ExecuteStatement:ThriftCLIService.java:486,
>  
> org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement:getResult:TCLIService.java:1313,
>  
> org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement:getResult:TCLIService.java:1298,
>  org.apache.thrift.ProcessFunction:process:ProcessFunction.java:39, 
> org.apache.thrift.TBaseProcessor:process:TBaseProcessor.java:39, 
> org.apache.hive.service.auth.TSetIpAddressProcessor:process:TSetIpAddressProcessor.java:56,
>  
> org.apache.thrift.server.TThreadPoolServer$WorkerProcess:run:TThreadPoolServer.java:285,
>  
> java.util.concurrent.ThreadPoolExecutor:runWorker:ThreadPoolExecutor.java:1142,
>  
> java.util.concurrent.ThreadPoolExecutor$Worker:run:ThreadPoolExecutor.java:617,
>  java.lang.Thread:run:Thread.java:745], errorCode:0, 
> errorMessage:org.apache.spark.SparkException: Job aborted due to stage 
> failure: Task 0 in stage 7212.0 failed 4 times, most recent failure: Lost 
> task 0.3 in stage 7212.0 (TID 138449, 208.92.52.88): 
> java.lang.IllegalArgumentException: Column [col2] was not found in schema!
>       at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:160)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:94)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59)
>       at 
> org.apache.parquet.filter2.predicate.Operators$Eq.accept(Operators.java:180)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64)
>       at 
> org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59)
>       at 
> org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40)
>       at 
> org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126)
>       at 
> org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46)
>       at 
> org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:160)
>       at 
> org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
>       at 
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.<init>(SqlNewHadoopRDD.scala:155)
>       at 
> org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:120)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>       at org.apache.spark.scheduler.Task.run(Task.scala:88)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:), Query: select col1 from `table3` where col2 = 2. [SQL 
> State=HY000, DB Errorcode=500051]
> Execution time: 0.44s
> 1 statement failed.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to