[ https://issues.apache.org/jira/browse/SPARK-20325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon updated SPARK-20325: --------------------------------- Issue Type: Documentation (was: Bug) > Spark Structured Streaming documentation Update: checkpoint configuration > ------------------------------------------------------------------------- > > Key: SPARK-20325 > URL: https://issues.apache.org/jira/browse/SPARK-20325 > Project: Spark > Issue Type: Documentation > Components: Structured Streaming > Affects Versions: 2.1.0 > Reporter: Kate Eri > Priority: Minor > > I have configured the following stream outputting to Kafka: > {code} > map.foreach(metric => { > streamToProcess > .groupBy(metric) > .agg(count(metric)) > .writeStream > .outputMode("complete") > .option("checkpointLocation", checkpointDir) > .foreach(kafkaWriter) > .start() > }) > {code} > And configured the checkpoint Dir for each of output sinks like: > .option("checkpointLocation", checkpointDir) according to the documentation > => > http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing > > As a result I've got the following exception: > Cannot start query with id bf6a1003-6252-4c62-8249-c6a189701255 as another > query with same id is already active. Perhaps you are attempting to restart a > query from checkpoint that is already active. > java.lang.IllegalStateException: Cannot start query with id > bf6a1003-6252-4c62-8249-c6a189701255 as another query with same id is already > active. Perhaps you are attempting to restart a query from checkpoint that is > already active. > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:291) > So according to current spark logic for “foreach” sink the checkpoint > configuration is loaded in the following way: > {code:title=StreamingQueryManager.scala} > val checkpointLocation = userSpecifiedCheckpointLocation.map { > userSpecified => > new Path(userSpecified).toUri.toString > }.orElse { > df.sparkSession.sessionState.conf.checkpointLocation.map { location => > new Path(location, > userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString > } > }.getOrElse { > if (useTempCheckpointLocation) { > Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath > } else { > throw new AnalysisException( > "checkpointLocation must be specified either " + > """through option("checkpointLocation", ...) or """ + > s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", > ...)""") > } > } > {code} > so first spark take checkpointDir from query, then from sparksession > (spark.sql.streaming.checkpointLocation) and so on. > But this behavior was not documented, thus two questions: > 1) could we update documentation for Structured Streaming and describe this > behavior > 2) Do we really need to specify the checkpoint dir per query? what the reason > for this? finally we will be forced to write some checkpointDir name > generator, for example associate it with some particular named query and so > on? -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org