[ https://issues.apache.org/jira/browse/SPARK-27331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16825874#comment-16825874 ]
Kineret commented on SPARK-27331: --------------------------------- set the schema to be the full schema after every commit solved the issue. Thanks! > Schema mismatch using MicroBatchReader with columns pruning > ----------------------------------------------------------- > > Key: SPARK-27331 > URL: https://issues.apache.org/jira/browse/SPARK-27331 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.3.1 > Environment: spark 2.3.1 > Reporter: Kineret > Priority: Major > Labels: datasource > > I'm writing a custom Spark streaming source. I want to support columns > pruning, I did something like this: > > {code:java} > class MyMicroBatchReader(...) extends MicroBatchReader with > SupportsPushDownRequiredColumns { > var schema: StructType = createSchema() > def readSchema(): StructType = schema > def pruneColumns(requiredSchema: StructType): Unit = { > schema = requiredSchema > } > ... > } > {code} > > if I run a streaming query selecting some columns, the job fails. For > example, running: > {code:java} > spark.readStream().format("mysource").load().select("Id").writeStream().format("console").start() > {code} > I obtain the following exception (in the second iteration): > {code:java} > 18/06/29 15:50:01 ERROR MicroBatchExecution: Query [id = > 59c13195-9d63-42c9-8f92-eb9d67e8b26c, runId = > 72124019-1ab3-48a9-9503-0cf1c7d26fb9] terminated with error > java.lang.AssertionError: assertion failed: Invalid batch: > fieldA#0,fieldB#1,fieldC,Id#3,fieldD#4,fieldE#5 != Id#52 at > scala.Predef$.assert(Predef.scala:170) at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2$$anonfun$applyOrElse$4.apply(MicroBatchExecution.scala:417) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2$$anonfun$applyOrElse$4.apply(MicroBatchExecution.scala:416) > at scala.Option.map(Option.scala:146) at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2.applyOrElse(MicroBatchExecution.scala:416) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2.applyOrElse(MicroBatchExecution.scala:414) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:414) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) > {code} > Can you please help? -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org