[ 
https://issues.apache.org/jira/browse/SPARK-11428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14988337#comment-14988337
 ] 

Brad Willard commented on SPARK-11428:
--------------------------------------

As a work around. You can create a new parquet file from the schema merged data 
frame and then run the query on the merged parquet file. However it's not 
practical for large datasets and really should be fixed.

> Schema Merging Broken for Some Queries
> --------------------------------------
>
>                 Key: SPARK-11428
>                 URL: https://issues.apache.org/jira/browse/SPARK-11428
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Spark Core
>    Affects Versions: 1.5.1
>         Environment: AWS,
>            Reporter: Brad Willard
>              Labels: dataframe, parquet, pyspark, schema, sparksql
>
> I have data being written into parquet format via spark streaming. The data 
> can change slightly so schema merging is required. I load a dataframe like 
> this
> {code}
> urls = [
>     "/streaming/parquet/events/key=2015-10-30*",
>     "/streaming/parquet/events/key=2015-10-29*"
> ]
> sdf = sql_context.read.option("mergeSchema", "true").parquet(*urls)
> sdf.registerTempTable('events')
> {code}
> If I print the schema you can see the contested column
> {code}
> sdf.printSchema()
> root
>  |-- _id: string (nullable = true)
> ...
>  |-- d__device_s: string (nullable = true)
>  |-- d__isActualPageLoad_s: string (nullable = true)
>  |-- d__landing_s: string (nullable = true)
>  |-- d__lang_s: string (nullable = true)
>  |-- d__os_s: string (nullable = true)
>  |-- d__performance_i: long (nullable = true)
>  |-- d__product_s: string (nullable = true)
>  |-- d__refer_s: string (nullable = true)
>  |-- d__rk_i: long (nullable = true)
>  |-- d__screen_s: string (nullable = true)
>  |-- d__submenuName_s: string (nullable = true)
> {code}
> The column that's in one but not the other file is  d__product_s
> So I'm able to run this query and it works fine.
> {code}
> sql_context.sql('''
>     select 
>         distinct(d__product_s) 
>     from 
>         events
>     where 
>         n = 'view'
> ''').collect()
> [Row(d__product_s=u'website'),
>  Row(d__product_s=u'store'),
>  Row(d__product_s=None),
>  Row(d__product_s=u'page')]
> {code}
> However if I instead use that column in the where clause things break.
> {code}
> sql_context.sql('''
>     select 
>         * 
>     from 
>         events
>     where 
>         n = 'view' and d__product_s = 'page'
> ''').take(1)
> ---------------------------------------------------------------------------
> Py4JJavaError                             Traceback (most recent call last)
> <ipython-input-15-04698b649759> in <module>()
>       6     where
>       7         n = 'frontsite_view' and d__product_s = 'page'
> ----> 8 ''').take(1)
> /root/spark/python/pyspark/sql/dataframe.pyc in take(self, num)
>     303         with SCCallSiteSync(self._sc) as css:
>     304             port = 
> self._sc._jvm.org.apache.spark.sql.execution.EvaluatePython.takeAndServe(
> --> 305                 self._jdf, num)
>     306         return list(_load_from_socket(port, 
> BatchedSerializer(PickleSerializer())))
>     307 
> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in 
> __call__(self, *args)
>     536         answer = self.gateway_client.send_command(command)
>     537         return_value = get_return_value(answer, self.gateway_client,
> --> 538                 self.target_id, self.name)
>     539 
>     540         for temp_arg in temp_args:
> /root/spark/python/pyspark/sql/utils.pyc in deco(*a, **kw)
>      34     def deco(*a, **kw):
>      35         try:
> ---> 36             return f(*a, **kw)
>      37         except py4j.protocol.Py4JJavaError as e:
>      38             s = e.java_exception.toString()
> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in 
> get_return_value(answer, gateway_client, target_id, name)
>     298                 raise Py4JJavaError(
>     299                     'An error occurred while calling {0}{1}{2}.\n'.
> --> 300                     format(target_id, '.', name), value)
>     301             else:
>     302                 raise Py4JError(
> Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.sql.execution.EvaluatePython.takeAndServe.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 15.0 failed 30 times, most recent failure: Lost task 0.29 in stage 
> 15.0 (TID 6536, 10.X.X.X): java.lang.IllegalArgumentException: Column 
> [d__product_s] was not found in schema!
>       at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:160)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:94)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59)
>       at 
> org.apache.parquet.filter2.predicate.Operators$Eq.accept(Operators.java:180)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:131)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59)
>       at 
> org.apache.parquet.filter2.predicate.Operators$And.accept(Operators.java:308)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64)
>       at 
> org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59)
>       at 
> org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40)
>       at 
> org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126)
>       at 
> org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46)
>       at 
> org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:160)
>       at 
> org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
>       at 
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.<init>(SqlNewHadoopRDD.scala:155)
>       at 
> org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:120)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>       at org.apache.spark.scheduler.Task.run(Task.scala:88)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>       at scala.Option.foreach(Option.scala:236)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
>       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>       at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
>       at 
> org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
>       at 
> org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
>       at 
> org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
>       at 
> org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
>       at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
>       at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1314)
>       at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1377)
>       at 
> org.apache.spark.sql.execution.EvaluatePython$.takeAndServe(python.scala:127)
>       at 
> org.apache.spark.sql.execution.EvaluatePython.takeAndServe(python.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>       at py4j.Gateway.invoke(Gateway.java:259)
>       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>       at py4j.commands.CallCommand.execute(CallCommand.java:79)
>       at py4j.GatewayConnection.run(GatewayConnection.java:207)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalArgumentException: Column [d__product_s] was not 
> found in schema!
>       at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:160)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:94)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59)
>       at 
> org.apache.parquet.filter2.predicate.Operators$Eq.accept(Operators.java:180)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:131)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59)
>       at 
> org.apache.parquet.filter2.predicate.Operators$And.accept(Operators.java:308)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64)
>       at 
> org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59)
>       at 
> org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40)
>       at 
> org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126)
>       at 
> org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46)
>       at 
> org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:160)
>       at 
> org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
>       at 
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.<init>(SqlNewHadoopRDD.scala:155)
>       at 
> org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:120)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>       at org.apache.spark.scheduler.Task.run(Task.scala:88)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       ... 1 more
> {code}
> I get the same error also when attempting to write the same query with the 
> dataframe api as well.
> {code}
> sdf.where(sdf.d__product_s == 'page').take(1)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to