[ 
https://issues.apache.org/jira/browse/SPARK-28546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tommy duan updated SPARK-28546:
-------------------------------
    Description: 
My code is as follows:
{code:java}
Dataset<Row> dataset = this.sparkSession.readStream().format("kafka")
 .options(this.getSparkKafkaCommonOptions(sparkSession)) 
 .option("kafka.bootstrap.servers", "192.168.1.1:9092,192.168.1.2:9092")
 .option("subscribe", "myTopic1,myTopic2")
 .option("startingOffsets", "earliest")
 .load();{code}
{code:java}
String mdtTempView = "mybasetemp";
 ExpressionEncoder<Row> Rowencoder = this.getSchemaEncoder(new 
Schema.Parser().parse(baseschema.getValue())); 
 Dataset<Row> parseVal = dataset.select("value").as(Encoders.BINARY())
 .map(new MapFunction<Row>(){
 ....
 }, Rowencoder)
 .createOrReplaceGlobalTempView(mdtTempView);
 
 Dataset<Row> queryResult = this.sparkSession.sql("select 。。。 from 
global_temp." + mdtTempView + " where start_time<>\"\"");

 String savePath= "/user/dx/streaming/data/testapp"; 
 String checkpointLocation= "/user/dx/streaming/checkpoint/testapp";

 StreamingQuery query = queryResult.writeStream().format("parquet")
 .option("path", savePath)
 .option("checkpointLocation", checkpointLocation)
 .partitionBy("month", "day", "hour")
 .outputMode(OutputMode.Append())
 .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
 .start();
try {
 query.awaitTermination();
 } catch (StreamingQueryException e) {
 e.printStackTrace();
 }
{code}
 

1) When I first ran it, I found that app could run normally.

2) Then, for some reason, I deleted the checkpoint directory of structured 
streaming and did not delete the savepath of sink file which saves HDFS files.

3) Then restart app, at which time only executor was assigned after app 
started, and no tasks were assigned. In the log, I found the print message: 
"INFO streaming. FileStream Sink: Skipping already committed batch 72". Later I 
looked at the source code and found that the log was from 
[https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala#L108]

4) The 3) situation lasts for several hours before the DAGScheduler is 
triggered to divide the DAG, submitStages, submitTasks, and tasks are assigned 
to the executor.

Later, I read the 
[https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala]
 code carefully, and realized that in FileStreamSink, a log would be included 
under savepath/_spark_metadata, if the current chId<=f batbat_metadata were 
recorded in the log. IleLog. getLatest () will skip saving and output the log 
directly: logInfo (s "Skipping already committed batch $batchId").

I think that since checkpoint is used, all information control rights should be 
given to checkpoint, and there should not be a batchId log information record.

  was:
My code is as follows:
{code:java}
Dataset<Row> dataset = this.sparkSession.readStream().format("kafka")
 .options(this.getSparkKafkaCommonOptions(sparkSession)) 
 .option("kafka.bootstrap.servers", "192.168.1.1:9092,192.168.1.2:9092")
 .option("subscribe", "myTopic1,myTopic2")
 .option("startingOffsets", "earliest")
 .load();{code}
{code:java}
String mdtTempView = "mybasetemp";
 ExpressionEncoder<Row> Rowencoder = this.getSchemaEncoder(new 
Schema.Parser().parse(baseschema.getValue())); 
 Dataset<Row> parseVal = dataset.select("value").as(Encoders.BINARY())
 .map(new MapFunction<Row>(){
 ....
 }, Rowencoder)
 .createOrReplaceGlobalTempView(mdtTempView);
 
 Dataset<Row> queryResult = this.sparkSession.sql("select 。。。 from 
global_temp." + mdtTempView + " where start_time<>\"\"");

 String savePath= "/user/dx/streaming/data/testapp"; 
 String checkpointLocation= "/user/dx/streaming/checkpoint/testapp";

 StreamingQuery query = queryResult.writeStream().format("parquet")
 .option("path", savePath)
 .option("checkpointLocation", checkpointLocation)
 .partitionBy("month", "day", "hour")
 .outputMode(OutputMode.Append())
 .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
 .start();
try {
 query.awaitTermination();
 } catch (StreamingQueryException e) {
 e.printStackTrace();
 }
{code}
 

1) When I first ran it, I found that app could run normally.

2) Then, for some reason, I deleted the checkpoint directory of structured 
streaming and did not delete the savepath of sink file which saves HDFS files.

3) Then restart app, at which time only executor was assigned after app 
started, and no tasks were assigned. In the log, I found the print message: 
"INFO streaming. FileStream Sink: Skipping already committed batch 72". Later I 
looked at the source code and found that the log was from 
[https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala#L108.|http://example.com]

4) The 3) situation lasts for several hours before the DAGScheduler is 
triggered to divide the DAG, submitStages, submitTasks, and tasks are assigned 
to the executor.

Later, I read the 
[https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala]
 code carefully, and realized that in FileStreamSink, a log would be included 
under savepath/_spark_metadata, if the current chId<=f batbat_metadata were 
recorded in the log. IleLog. getLatest () will skip saving and output the log 
directly: logInfo (s "Skipping already committed batch $batchId").

I think that since checkpoint is used, all information control rights should be 
given to checkpoint, and there should not be a batchId log information record.


> Why does the File Sink operation of Spark 2.4 Structured Streaming include 
> double-level version validation?
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-28546
>                 URL: https://issues.apache.org/jira/browse/SPARK-28546
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>         Environment: Spark 2.4
> Structured Streaming
>            Reporter: tommy duan
>            Priority: Major
>
> My code is as follows:
> {code:java}
> Dataset<Row> dataset = this.sparkSession.readStream().format("kafka")
>  .options(this.getSparkKafkaCommonOptions(sparkSession)) 
>  .option("kafka.bootstrap.servers", "192.168.1.1:9092,192.168.1.2:9092")
>  .option("subscribe", "myTopic1,myTopic2")
>  .option("startingOffsets", "earliest")
>  .load();{code}
> {code:java}
> String mdtTempView = "mybasetemp";
>  ExpressionEncoder<Row> Rowencoder = this.getSchemaEncoder(new 
> Schema.Parser().parse(baseschema.getValue())); 
>  Dataset<Row> parseVal = dataset.select("value").as(Encoders.BINARY())
>  .map(new MapFunction<Row>(){
>  ....
>  }, Rowencoder)
>  .createOrReplaceGlobalTempView(mdtTempView);
>  
>  Dataset<Row> queryResult = this.sparkSession.sql("select 。。。 from 
> global_temp." + mdtTempView + " where start_time<>\"\"");
>  String savePath= "/user/dx/streaming/data/testapp"; 
>  String checkpointLocation= "/user/dx/streaming/checkpoint/testapp";
>  StreamingQuery query = queryResult.writeStream().format("parquet")
>  .option("path", savePath)
>  .option("checkpointLocation", checkpointLocation)
>  .partitionBy("month", "day", "hour")
>  .outputMode(OutputMode.Append())
>  .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
>  .start();
> try {
>  query.awaitTermination();
>  } catch (StreamingQueryException e) {
>  e.printStackTrace();
>  }
> {code}
>  
> 1) When I first ran it, I found that app could run normally.
> 2) Then, for some reason, I deleted the checkpoint directory of structured 
> streaming and did not delete the savepath of sink file which saves HDFS files.
> 3) Then restart app, at which time only executor was assigned after app 
> started, and no tasks were assigned. In the log, I found the print message: 
> "INFO streaming. FileStream Sink: Skipping already committed batch 72". Later 
> I looked at the source code and found that the log was from 
> [https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala#L108]
> 4) The 3) situation lasts for several hours before the DAGScheduler is 
> triggered to divide the DAG, submitStages, submitTasks, and tasks are 
> assigned to the executor.
> Later, I read the 
> [https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala]
>  code carefully, and realized that in FileStreamSink, a log would be included 
> under savepath/_spark_metadata, if the current chId<=f batbat_metadata were 
> recorded in the log. IleLog. getLatest () will skip saving and output the log 
> directly: logInfo (s "Skipping already committed batch $batchId").
> I think that since checkpoint is used, all information control rights should 
> be given to checkpoint, and there should not be a batchId log information 
> record.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to