Arun Mahadevan created SPARK-24462:
--------------------------------------

             Summary: Text socket micro-batch reader throws error when a query 
is restarted with saved state
                 Key: SPARK-24462
                 URL: https://issues.apache.org/jira/browse/SPARK-24462
             Project: Spark
          Issue Type: Improvement
          Components: Structured Streaming
    Affects Versions: 2.3.0
            Reporter: Arun Mahadevan


Exception thrown:

 
{noformat}
scala> 18/06/01 22:47:04 ERROR MicroBatchExecution: Query [id = 
0bdc4428-5d21-4237-9d64-898ae65f28f3, runId = 
f6822423-2bd2-47c1-8ed6-799d1c481195] terminated with error
java.lang.RuntimeException: Offsets committed out of order: 2 followed by -1
 at scala.sys.package$.error(package.scala:27)
 at 
org.apache.spark.sql.execution.streaming.sources.TextSocketMicroBatchReader.commit(socket.scala:197)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$2$$anonfun$apply$mcV$sp$5.apply(MicroBatchExecution.scala:377)
 
{noformat}
 

Sample code that reproduces the error on restarting the query.

 
{code:java}
 

import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.streaming.Trigger

val lines = spark.readStream.format("socket").option("host", 
"localhost").option("port", 9999).option("includeTimestamp", true).load()

val words = lines.as[(String, Timestamp)].flatMap(line => line._1.split(" 
").map(word => (word, line._2))).toDF("word", "timestamp")

val windowedCounts = words.groupBy(window($"timestamp", "20 minutes", "20 
minutes"), $"word").count().orderBy("window")

val query = 
windowedCounts.writeStream.outputMode("complete").option("checkpointLocation", 
"/tmp/debug").format("console").option("truncate", "false").start()


{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to