[ 
https://issues.apache.org/jira/browse/SPARK-24670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mahbub Murshed resolved SPARK-24670.
------------------------------------
    Resolution: Fixed

The problem with count difference was ultimately solved by setting 
maxFilesPerTrigger setting to a low number. Initially, it was set to 1000, 
meaning Spark will try processing 1000 files at a time. This means for the 
given data, it will need to process about 50 days of data, which would take 
about 2 days to complete, before it writes everything to the disk. Setting it 
to a lower number solves the problem.

> How to stream only newer files from a folder in Apache Spark?
> -------------------------------------------------------------
>
>                 Key: SPARK-24670
>                 URL: https://issues.apache.org/jira/browse/SPARK-24670
>             Project: Spark
>          Issue Type: Question
>          Components: Input/Output, Structured Streaming
>    Affects Versions: 2.3.0
>            Reporter: Mahbub Murshed
>            Priority: Major
>
> Background:
> I have a directory in Google Cloud Storage containing files for 1.5 years of 
> data. The files are named as hits_<DATE>_<COUNT>.csv. For example, for June 
> 24, say there are three files, hits_20180624_000.csv, hits_20180624_001.csv, 
> hits_20180624_002.csv. etc. The folder has files since January 2017. New 
> files are dropped in the folder every day.
> I am reading the files using Spark streaming and writing to AWS S3. 
> Problem:
> For the first batch Spark processes ALL files in the folder. It will take 
> about a month to complete the entire set.
> Moreover, when writing out the data, Spark isn't completely writing out each 
> days of data until the entire folder is complete.
> Example:
> Say each input file contains 100,000 records.
> Input:
> hits_20180624_000.csv
> hits_20180624_001.csv
> hits_20180624_002.csv
> hits_20180623_000.csv
> hits_20180623_001.csv
> ...
> hits_20170101_000.csv
> hits_20170101_001.csv
> Processing:
> Drops half records (say). Each output files should contain 50,000 records per 
> day.
> Output Expected (number of file may be different):
> year=2018/month=6/day=24/hash0.parquet
> year=2018/month=6/day=24/hash1.parquet
> year=2018/month=6/day=24/hash2.parquet
> year=2018/month=6/day=23/hash0.parquet
> year=2018/month=6/day=23/hash1.parquet
> ...
> Problem:
> Each day contains less than 50,000 records, unless entire batch is complete. 
> In a test with a small subset this behavior was reproduced.
> Question:
> Is there a way to configure Spark to not load older files, even for the first 
> load? Why is Spark not writing out the remaining records?
> Things I tried:
> 1. A trigger of 1 hr
> 2. Watermarking based on eventtime
> [1]: 
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to