[ https://issues.apache.org/jira/browse/SPARK-24670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mahbub Murshed resolved SPARK-24670. ------------------------------------ Resolution: Fixed The problem with count difference was ultimately solved by setting maxFilesPerTrigger setting to a low number. Initially, it was set to 1000, meaning Spark will try processing 1000 files at a time. This means for the given data, it will need to process about 50 days of data, which would take about 2 days to complete, before it writes everything to the disk. Setting it to a lower number solves the problem. > How to stream only newer files from a folder in Apache Spark? > ------------------------------------------------------------- > > Key: SPARK-24670 > URL: https://issues.apache.org/jira/browse/SPARK-24670 > Project: Spark > Issue Type: Question > Components: Input/Output, Structured Streaming > Affects Versions: 2.3.0 > Reporter: Mahbub Murshed > Priority: Major > > Background: > I have a directory in Google Cloud Storage containing files for 1.5 years of > data. The files are named as hits_<DATE>_<COUNT>.csv. For example, for June > 24, say there are three files, hits_20180624_000.csv, hits_20180624_001.csv, > hits_20180624_002.csv. etc. The folder has files since January 2017. New > files are dropped in the folder every day. > I am reading the files using Spark streaming and writing to AWS S3. > Problem: > For the first batch Spark processes ALL files in the folder. It will take > about a month to complete the entire set. > Moreover, when writing out the data, Spark isn't completely writing out each > days of data until the entire folder is complete. > Example: > Say each input file contains 100,000 records. > Input: > hits_20180624_000.csv > hits_20180624_001.csv > hits_20180624_002.csv > hits_20180623_000.csv > hits_20180623_001.csv > ... > hits_20170101_000.csv > hits_20170101_001.csv > Processing: > Drops half records (say). Each output files should contain 50,000 records per > day. > Output Expected (number of file may be different): > year=2018/month=6/day=24/hash0.parquet > year=2018/month=6/day=24/hash1.parquet > year=2018/month=6/day=24/hash2.parquet > year=2018/month=6/day=23/hash0.parquet > year=2018/month=6/day=23/hash1.parquet > ... > Problem: > Each day contains less than 50,000 records, unless entire batch is complete. > In a test with a small subset this behavior was reproduced. > Question: > Is there a way to configure Spark to not load older files, even for the first > load? Why is Spark not writing out the remaining records? > Things I tried: > 1. A trigger of 1 hr > 2. Watermarking based on eventtime > [1]: > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org