[ 
https://issues.apache.org/jira/browse/SPARK-20325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-20325:
---------------------------------
    Issue Type: Documentation  (was: Bug)

> Spark Structured Streaming documentation Update: checkpoint configuration
> -------------------------------------------------------------------------
>
>                 Key: SPARK-20325
>                 URL: https://issues.apache.org/jira/browse/SPARK-20325
>             Project: Spark
>          Issue Type: Documentation
>          Components: Structured Streaming
>    Affects Versions: 2.1.0
>            Reporter: Kate Eri
>            Priority: Minor
>
> I have configured the following stream outputting to Kafka:
> {code}
> map.foreach(metric => {
>       streamToProcess
>         .groupBy(metric)
>         .agg(count(metric))
>         .writeStream
>         .outputMode("complete")
>         .option("checkpointLocation", checkpointDir)
>         .foreach(kafkaWriter)
>         .start()
>     })
> {code}
> And configured the checkpoint Dir for each of output sinks like: 
> .option("checkpointLocation", checkpointDir)  according to the documentation 
> => 
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
>  
> As a result I've got the following exception: 
> Cannot start query with id bf6a1003-6252-4c62-8249-c6a189701255 as another 
> query with same id is already active. Perhaps you are attempting to restart a 
> query from checkpoint that is already active.
> java.lang.IllegalStateException: Cannot start query with id 
> bf6a1003-6252-4c62-8249-c6a189701255 as another query with same id is already 
> active. Perhaps you are attempting to restart a query from checkpoint that is 
> already active.
>       at 
> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:291)
> So according to current spark logic for “foreach” sink the checkpoint 
> configuration is loaded in the following way: 
> {code:title=StreamingQueryManager.scala}
>    val checkpointLocation = userSpecifiedCheckpointLocation.map { 
> userSpecified =>
>       new Path(userSpecified).toUri.toString
>     }.orElse {
>       df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
>         new Path(location, 
> userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString
>       }
>     }.getOrElse {
>       if (useTempCheckpointLocation) {
>         Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
>       } else {
>         throw new AnalysisException(
>           "checkpointLocation must be specified either " +
>             """through option("checkpointLocation", ...) or """ +
>             s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", 
> ...)""")
>       }
>     }
> {code}
> so first spark take checkpointDir from query, then from sparksession 
> (spark.sql.streaming.checkpointLocation) and so on. 
> But this behavior was not documented, thus two questions:
> 1) could we update documentation for Structured Streaming and describe this 
> behavior
> 2) Do we really need to specify the checkpoint dir per query? what the reason 
> for this? finally we will be forced to write some checkpointDir name 
> generator, for example associate it with some particular named query and so 
> on?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to