[ https://issues.apache.org/jira/browse/SPARK-39132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534134#comment-17534134 ]
XiDuo You commented on SPARK-39132: ----------------------------------- same bug with SPARK-39104 > spark3.2.1 cache throw NPE > -------------------------- > > Key: SPARK-39132 > URL: https://issues.apache.org/jira/browse/SPARK-39132 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 3.2.1 > Environment: i set it a driver and 2 executors executor allocate 2g > memory and old generation usage rate about 50%, i think it is health > Reporter: cxb > Priority: Major > Original Estimate: 72h > Remaining Estimate: 72h > > a job running some time about 1 day will throw the exception when i upgrade > spark version to 3.2.1 > gc log: > {code:java} > Heap > par new generation total 307840K, used 239453K [0x0000000080000000, > 0x0000000094e00000, 0x00000000aaaa0000) > eden space 273664K, 81% used [0x0000000080000000, 0x000000008da4bdd0, > 0x0000000090b40000) > from space 34176K, 46% used [0x0000000092ca0000, 0x0000000093c2b6b8, > 0x0000000094e00000) > to space 34176K, 0% used [0x0000000090b40000, 0x0000000090b40000, > 0x0000000092ca0000) > concurrent mark-sweep generation total 811300K, used 451940K > [0x00000000aaaa0000, 0x00000000dc2e9000, 0x0000000100000000) > Metaspace used 102593K, capacity 110232K, committed 121000K, reserved > 1155072K > class space used 12473K, capacity 13482K, committed 15584K, reserved > 1048576K {code} > code: > {{}}{{}} > > {code:java} > sparkSession > .readStream > .format('kafka') > .load > .repartition(4) > ...project > .watermark > .groupby(k1, k2) > .agg(size(collect_set('xxx'))) > .writeStream > .foreachBatch(function test) > .start > def test:(Dataset[Row], Long) => Unit = (ds: Dataset[Row], _: Long) => { > ds.persist(StorageLevel.MEMORY_AND_DISK_SER) > ds.write > .option("collection", s"col_1") > .option("maxBatchSize", "2048") > .mode("append") > .mongo() > ds..write > .option("collection", s"col_2") > .option("maxBatchSize", "2048") > .mode("append") > .mongo() > ds.unpersist() > }{code} > > > exception log > > {code:java} > {code} > 22/05/09 21:11:28 ERROR streaming.MicroBatchExecution: Query rydts_regist_gp > [id = 669c2031-71b2-422b-859d-336722d289e9, runId = > 049de32c-e6ff-48f1-8742-bb95122a36ea] terminated with error > java.lang.NullPointerException > at > org.apache.spark.sql.execution.columnar.CachedRDDBuilder.$anonfun$isCachedRDDLoaded$1(InMemoryRelation.scala:248) > at > org.apache.spark.sql.execution.columnar.CachedRDDBuilder.$anonfun$isCachedRDDLoaded$1$adapted(InMemoryRelation.scala:247) > at > scala.collection.IndexedSeqOptimized.prefixLengthImpl(IndexedSeqOptimized.scala:41) > at > scala.collection.IndexedSeqOptimized.forall(IndexedSeqOptimized.scala:46) > at > scala.collection.IndexedSeqOptimized.forall$(IndexedSeqOptimized.scala:46) > at scala.collection.mutable.ArrayOps$ofRef.forall(ArrayOps.scala:198) > at > org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedRDDLoaded(InMemoryRelation.scala:247) > at > org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedColumnBuffersLoaded(InMemoryRelation.scala:241) > at > org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8(CacheManager.scala:189) > at > org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8$adapted(CacheManager.scala:176) > at > scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303) > at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297) > at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) > at scala.collection.TraversableLike.filter(TraversableLike.scala:395) > at scala.collection.TraversableLike.filter$(TraversableLike.scala:395) > at scala.collection.AbstractTraversable.filter(Traversable.scala:108) > at > org.apache.spark.sql.execution.CacheManager.recacheByCondition(CacheManager.scala:219) > at > org.apache.spark.sql.execution.CacheManager.uncacheQuery(CacheManager.scala:176) > at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3220) > at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3231) > at common.job.xxx$.$anonfun$main$3(xxx.scala:117) > at common.job.xxx$.$anonfun$main$3$adapted(xxx.scala:103) > at > org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598) > at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) > at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) > at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187) > at > org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209) > > > -- This message was sent by Atlassian Jira (v8.20.7#820007) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org