[ https://issues.apache.org/jira/browse/SPARK-19768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Amit Baghel updated SPARK-19768: -------------------------------- Summary: Error for both aggregate and non-aggregate queries in Structured Streaming - "This query does not support recovering from checkpoint location" (was: Error for both aggregate and non-aggregate queries in Structured streaming - "This query does not support recovering from checkpoint location") > Error for both aggregate and non-aggregate queries in Structured Streaming > - "This query does not support recovering from checkpoint location" > ------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-19768 > URL: https://issues.apache.org/jira/browse/SPARK-19768 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.1.0 > Reporter: Amit Baghel > > I am running JavaStructuredKafkaWordCount.java example with > checkpointLocation. Output mode is "complete". Below is relevant code. > {code} > // Generate running word count > Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, > String>() { > @Override > public Iterator<String> call(String x) { > return Arrays.asList(x.split(" ")).iterator(); > } > }, Encoders.STRING()).groupBy("value").count(); > // Start running the query that prints the running counts to the console > StreamingQuery query = wordCounts.writeStream() > .outputMode("complete") > .format("console") > .option("checkpointLocation", "/tmp/checkpoint-data") > .start(); > {code} > This example runs successfully and writes data in checkpoint directory. When > I re-run the program it throws below exception > {code} > Exception in thread "main" org.apache.spark.sql.AnalysisException: This query > does not support recovering from checkpoint location. Delete > /tmp/checkpoint-data/offsets to start over.; > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) > at > com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85) > {code} > Then I modified JavaStructuredKafkaWordCount.java to have non aggregate query > with output mode as "append". Please see the code below. > {code} > // no aggregations > Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, > String>() { > @Override > public Iterator<String> call(String x) { > return Arrays.asList(x.split(" ")).iterator(); > } > }, Encoders.STRING()).select("value"); > // append mode with console > StreamingQuery query = wordCounts.writeStream() > .outputMode("append") > .format("console") > .option("checkpointLocation", "/tmp/checkpoint-data") > .start(); > {code} > This modified code runs successfully and writes data in checkpoint directory. > When I re-run the program it throws same exception > {code} > Exception in thread "main" org.apache.spark.sql.AnalysisException: This query > does not support recovering from checkpoint location. Delete > /tmp/checkpoint-data/offsets to start over.; > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) > at > com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org