[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16545577#comment-16545577
 ] 

Iqbal Singh commented on SPARK-24295:
-------------------------------------

Hey [~XuanYuan],

We are processing 3000 files every 5 minutes 24X7 using structured streaming, 
File size is 120MB on average. 
 * Every Structured streaming batch commit file size is around 800KB to 1000KB 
and compact file keep track of all the data from the start of the process. It 
goes up to 8Gb after 45 days and structured streaming process takes more than 
15 mins to compact the file every 10th batch.

 * We are using Dynamic partitions while dumping the data which also increases 
the output file count for each micro batch ratio is 2:3. (2 input files give us 
3 output files). 

 * Spark forces the jobs to read the data using _spark__metadata files if the 
input directory of the job is a structured streaming output, Which wastes 
another 10-15 minutes for generating a list of files from "_spark_metadata" 
commit compact file.

 * Compact file has data in json format and grows in size very fast, if  we 
have too many files to process in each batch.

 

*File:* org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala

-- Delete Action is defined in the Class "FileStreamSinkLog" but it is not 
implemented any where in code.

 
{code:java}
object FileStreamSinkLog {
 val VERSION = 1
 val DELETE_ACTION = "delete"
 val ADD_ACTION = "add"
} 
{code}
 

-- Below code never executes, Where we are deleting the Sink logs with action 
"DELETE" while compacting the files
{code:java}
override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
 val deletedFiles = logs.filter(_.action == 
FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet
 if (deletedFiles.isEmpty) {
 logs
 } else {
 logs.filter(f => !deletedFiles.contains(f.path))
 }
}{code}
 

-- We do not have batch Number info in the Compact file as a metric, it is 
tough to keep  defined number of batches  in the file. We have modification 
Time and can use it to mark the sink metadata log records as delete based on 
some data retention on time. 

 

We have developed a Spark job to read the metadata as a spark job and generate 
a list of files to have exactly once guarantee and it passes the list of files 
for a particular batch to the spark job, it takes 60 seconds to read the 
compact file using spark.

 

We are working on a explicit data purge job for the compact file to keep its 
size under control, Please let me know if more details are required and also if 
there is anything we are missing out.

 

Appreciate your help.

 

Thanks,

Iqbal Singh

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> ------------------------------------------------------------------------
>
>                 Key: SPARK-24295
>                 URL: https://issues.apache.org/jira/browse/SPARK-24295
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.0
>            Reporter: Iqbal Singh
>            Priority: Major
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to