[ 
https://issues.apache.org/jira/browse/SPARK-23844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162438#comment-17162438
 ] 

pengzhiwei commented on SPARK-23844:
------------------------------------

Thanks [~jerryshao2015],I have meet the same issue with you.

> Socket Stream recovering from checkpoint will throw exception
> -------------------------------------------------------------
>
>                 Key: SPARK-23844
>                 URL: https://issues.apache.org/jira/browse/SPARK-23844
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Saisai Shao
>            Priority: Major
>
> When we specified checkpoint location as well as using socket streaming, it 
> will throw exception after rerun:
> {noformat}
> 18/04/02 14:11:28 ERROR MicroBatchExecution: Query test [id = 
> c5ca82b2-550b-4c3d-9127-869f1aeae477, runId = 
> 552d5bd4-a7e7-44e5-a85a-2f04f666ff6a] terminated with error
> java.lang.RuntimeException: Offsets committed out of order: 0 followed by -1
> at scala.sys.package$.error(package.scala:27)
> at 
> org.apache.spark.sql.execution.streaming.sources.TextSocketMicroBatchReader.commit(socket.scala:196)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(MicroBatchExecution.scala:373)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(MicroBatchExecution.scala:370)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at 
> org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcV$sp(MicroBatchExecution.scala:370)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:353)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:353)
> at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:353)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:142)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:135)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:135)
> at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:135)
> at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:131)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189){noformat}
> Basically it means that {{TextSocketMicroBatchReader}} is honoring the 
> offsets recovered from checkpoint, this is not correct for socket source, as 
> it doesn't support recovering from checkpoint. Even though the offset is 
> recovered, the real data is still unmatched from this offset.
> To reproduce this issue,
> {code:java}
> val socket = spark.readStream.format("socket").options(Map("host" -> 
> "localhost", "port" -> "9999")).load
> spark.conf.set("spark.sql.streaming.checkpointLocation", "./checkpoint")
> socket.writeStream.format("parquet").option("path", 
> "./result").queryName("test").start
> {code}
> This will be failed in the second run.
> Though this source is not supported from in production envs, I think still we 
> should make sure it can be worked in test env without throwing runtime 
> exception.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to