tommy duan created SPARK-28546:
----------------------------------
Summary: Why does the File Sink operation of Spark 2.4 Structured
Streaming include double-level version validation?
Key: SPARK-28546
URL: https://issues.apache.org/jira/browse/SPARK-28546
Project: Spark
Issue Type: Bug
Components: Structured Streaming
Affects Versions: 2.4.0
Environment: Spark 2.4
Structured Streaming
Reporter: tommy duan
My code is as follows:
//输出2个文件
Dataset<Row> dataset = this.sparkSession.readStream().format("kafka")
.options(this.getSparkKafkaCommonOptions(sparkSession))
//读取spark-testapp.conf,自定义配置信息。
.option("kafka.bootstrap.servers", "192.168.1.1:9092,192.168.1.2:9092")
.option("subscribe", "myTopic1,myTopic2")
.option("startingOffsets", "earliest")
.load();
{code:java}
String mdtTempView = "mybasetemp";
ExpressionEncoder<Row> Rowencoder = this.getSchemaEncoder(new
Schema.Parser().parse(baseschema.getValue()));
Dataset<Row> parseVal = dataset.select("value").as(Encoders.BINARY())
.map(new MapFunction<Row>(){
....
}, Rowencoder)
.createOrReplaceGlobalTempView(mdtTempView);
Dataset<Row> queryResult = this.sparkSession.sql("select 。。。 from
global_temp." + mdtTempView + " where start_time<>\"\"");
String savePath= "/user/dx/streaming/data/testapp";
String checkpointLocation= "/user/dx/streaming/checkpoint/testapp";
StreamingQuery query = queryResult.writeStream().format("parquet")
.option("path", savePath)
.option("checkpointLocation", checkpointLocation)
.partitionBy("month", "day", "hour")
.outputMode(OutputMode.Append())
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.start();
try {
query.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
}
{code}
1) When I first ran it, I found that app could run normally.
2) Then, for some reason, I deleted the checkpoint directory of structured
streaming and did not delete the savepath of sink file which saves HDFS files.
3) Then restart app, at which time only executor was assigned after app
started, and no tasks were assigned. In the log, I found the print message:
"INFO streaming. FileStream Sink: Skipping already committed batch 72". Later I
looked at the source code and found that the log was from
https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.a
L108.
4) The 3) situation lasts for several hours before the DAGScheduler is
triggered to divide the DAG, submitStages, submitTasks, and tasks are assigned
to the executor.
Later, I read the
https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
code carefully, and realized that in FileStreamSink, a log would be included
under savepath/_spark_metadata, if the current chId<=f batbat_metadata were
recorded in the log. IleLog. getLatest () will skip saving and output the log
directly: logInfo (s "Skipping already committed batch $batchId").
I think that since checkpoint is used, all information control rights should be
given to checkpoint, and there should not be a batchId log information record.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]