[ https://issues.apache.org/jira/browse/SPARK-23844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-23844: ------------------------------------ Assignee: (was: Apache Spark) > Socket Stream recovering from checkpoint will throw exception > ------------------------------------------------------------- > > Key: SPARK-23844 > URL: https://issues.apache.org/jira/browse/SPARK-23844 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.0 > Reporter: Saisai Shao > Priority: Major > > When we specified checkpoint location as well as using socket streaming, it > will throw exception after rerun: > {noformat} > 18/04/02 14:11:28 ERROR MicroBatchExecution: Query test [id = > c5ca82b2-550b-4c3d-9127-869f1aeae477, runId = > 552d5bd4-a7e7-44e5-a85a-2f04f666ff6a] terminated with error > java.lang.RuntimeException: Offsets committed out of order: 0 followed by -1 > at scala.sys.package$.error(package.scala:27) > at > org.apache.spark.sql.execution.streaming.sources.TextSocketMicroBatchReader.commit(socket.scala:196) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(MicroBatchExecution.scala:373) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(MicroBatchExecution.scala:370) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcV$sp(MicroBatchExecution.scala:370) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:353) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:353) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:353) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:142) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:135) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:135) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:135) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:131) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189){noformat} > Basically it means that {{TextSocketMicroBatchReader}} is honoring the > offsets recovered from checkpoint, this is not correct for socket source, as > it doesn't support recovering from checkpoint. Even though the offset is > recovered, the real data is still unmatched from this offset. > To reproduce this issue, > {code:java} > val socket = spark.readStream.format("socket").options(Map("host" -> > "localhost", "port" -> "9999")).load > spark.conf.set("spark.sql.streaming.checkpointLocation", "./checkpoint") > socket.writeStream.format("parquet").option("path", > "./result").queryName("test").start > {code} > This will be failed in the second run. > Though this source is not supported from in production envs, I think still we > should make sure it can be worked in test env without throwing runtime > exception. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org