[ https://issues.apache.org/jira/browse/SPARK-28546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16894900#comment-16894900 ]
Hyukjin Kwon commented on SPARK-28546: -------------------------------------- [~yy3b2007com], questions should better go to the mailing list. Let's interact there before filing it as an issue if you're not sure on that. > Why does the File Sink operation of Spark 2.4 Structured Streaming include > double-level version validation? > ----------------------------------------------------------------------------------------------------------- > > Key: SPARK-28546 > URL: https://issues.apache.org/jira/browse/SPARK-28546 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.0 > Environment: Spark 2.4 > Structured Streaming > Reporter: tommy duan > Priority: Major > > My code is as follows: > {code:java} > Dataset<Row> dataset = this.sparkSession.readStream().format("kafka") > .options(this.getSparkKafkaCommonOptions(sparkSession)) > .option("kafka.bootstrap.servers", "192.168.1.1:9092,192.168.1.2:9092") > .option("subscribe", "myTopic1,myTopic2") > .option("startingOffsets", "earliest") > .load();{code} > {code:java} > String mdtTempView = "mybasetemp"; > ExpressionEncoder<Row> Rowencoder = this.getSchemaEncoder(new > Schema.Parser().parse(baseschema.getValue())); > Dataset<Row> parseVal = dataset.select("value").as(Encoders.BINARY()) > .map(new MapFunction<Row>(){ > .... > }, Rowencoder) > .createOrReplaceGlobalTempView(mdtTempView); > > Dataset<Row> queryResult = this.sparkSession.sql("select 。。。 from > global_temp." + mdtTempView + " where start_time<>\"\""); > String savePath= "/user/dx/streaming/data/testapp"; > String checkpointLocation= "/user/dx/streaming/checkpoint/testapp"; > StreamingQuery query = queryResult.writeStream().format("parquet") > .option("path", savePath) > .option("checkpointLocation", checkpointLocation) > .partitionBy("month", "day", "hour") > .outputMode(OutputMode.Append()) > .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)) > .start(); > try { > query.awaitTermination(); > } catch (StreamingQueryException e) { > e.printStackTrace(); > } > {code} > > 1) When I first ran it, I found that app could run normally. > 2) Then, for some reason, I deleted the checkpoint directory of structured > streaming and did not delete the savepath of sink file which saves HDFS files. > 3) Then restart app, at which time only executor was assigned after app > started, and no tasks were assigned. In the log, I found the print message: > "INFO streaming. FileStream Sink: Skipping already committed batch 72". Later > I looked at the source code and found that the log was from > [https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala#L108] > 4) The 3) situation lasts for several hours before the DAGScheduler is > triggered to divide the DAG, submitStages, submitTasks, and tasks are > assigned to the executor. > Later, I read the > [https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala] > code carefully, and realized that in FileStreamSink, a log would be included > under savepath/_spark_metadata, if the current batchId<=log. getLatest () > will skip saving and output the log directly: logInfo (s "Skipping already > committed batch $batchId"). > > {code:java} > class FileStreamSink( > sparkSession: SparkSession, > path: String, > fileFormat: FileFormat, > partitionColumnNames: Seq[String], > options: Map[String, String]) extends Sink with Logging { > private val basePath = new Path(path) > private val logPath = new Path(basePath, FileStreamSink.metadataDir) > private val fileLog = > new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, > logPath.toUri.toString) > > override def addBatch(batchId: Long, data: DataFrame): Unit = { > if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) { > logInfo(s"Skipping already committed batch $batchId") > } else { > // save file to hdfs > } > } > //... > } > {code} > > I think that since checkpoint is used, all information control rights should > be given to checkpoint, and there should not be a batchId log information > record. -- This message was sent by Atlassian JIRA (v7.6.14#76016) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org