[ 
https://issues.apache.org/jira/browse/SPARK-27331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16825874#comment-16825874
 ] 

Kineret commented on SPARK-27331:
---------------------------------

set the schema to be the full schema after every commit solved the issue. 
Thanks!

> Schema mismatch using MicroBatchReader with columns pruning
> -----------------------------------------------------------
>
>                 Key: SPARK-27331
>                 URL: https://issues.apache.org/jira/browse/SPARK-27331
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.1
>         Environment: spark 2.3.1
>            Reporter: Kineret
>            Priority: Major
>              Labels: datasource
>
> I'm writing a custom Spark streaming source. I want to support columns 
> pruning, I did something like this:
>  
> {code:java}
> class MyMicroBatchReader(...) extends MicroBatchReader with 
> SupportsPushDownRequiredColumns {
>   var schema: StructType = createSchema()
>   def readSchema(): StructType = schema
>   def pruneColumns(requiredSchema: StructType): Unit = {
>     schema = requiredSchema
>   }
>   ...
> }
> {code}
>  
> if I run a streaming query selecting some columns, the job fails. For 
> example, running:
> {code:java}
> spark.readStream().format("mysource").load().select("Id").writeStream().format("console").start()
> {code}
> I obtain the following exception (in the second iteration):
> {code:java}
> 18/06/29 15:50:01 ERROR MicroBatchExecution: Query [id = 
> 59c13195-9d63-42c9-8f92-eb9d67e8b26c, runId = 
> 72124019-1ab3-48a9-9503-0cf1c7d26fb9] terminated with error 
> java.lang.AssertionError: assertion failed: Invalid batch: 
> fieldA#0,fieldB#1,fieldC,Id#3,fieldD#4,fieldE#5 != Id#52 at 
> scala.Predef$.assert(Predef.scala:170) at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2$$anonfun$applyOrElse$4.apply(MicroBatchExecution.scala:417)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2$$anonfun$applyOrElse$4.apply(MicroBatchExecution.scala:416)
>  at scala.Option.map(Option.scala:146) at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2.applyOrElse(MicroBatchExecution.scala:416)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2.applyOrElse(MicroBatchExecution.scala:414)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>  at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) 
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:414)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>  at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
>  at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> {code}
> Can you please help?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to