[jira] [Commented] (SPARK-4734) [Streaming]limit the file Dstream size for each batch
[ https://issues.apache.org/jira/browse/SPARK-4734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14236519#comment-14236519 ] 宿荣全 commented on SPARK-4734: [~srowen] [~srowen] I think that I still do not describe the suggestion clearly. 1.About the limit of receive data to run job is per batch duration not per day. 2.When the amount of receive data far exceed the streaming processing the amount data of the cluster in batch duration.make a larger batch duration.I think increasing the batch duration is pointless. 3.About threshold value: Streaming according to the actual receiving data processing. when streaming receive the amount of data is less than limit value in one batch duration.when receive the amount of data is mroe than limit value,streaming will be process threshold value data. return last example: – per batch duration:2S – The processing input data's ability of a cluster per batch duration: 50M – HDFS(other system's output files): – 8:00~22:00 : Add some files the amount of data is 60M per batch duration. – 22:00~tomorrow 8:00 : Add some files the amount of data is 3M per batch duration. the patch modification: [8:00~tomorrow 8:00]: The amount of receive data : the total count of batch durations per hour: 60s/2s * 60m =1800 8:00~22:00 ==> 14hours * 1800 * 60M(input data) =1512000M 22:00~tomorrow 8:00=>10hours * 1800 * 3M(input data) =54000M the amount of receive data per day :1512000M + 54000M = 1566000M this streaming patch process: Put the fined new files's name and size input a queue.And streaming from queue take file property make bath input data .following down: jobs queue: bath duration:2s --> 2s ->n * 2s ->22:00->n * 2s--->tomorrow 8:00 receive data :60M->60M->..->3M->3M->3M->60M->... process data:50M->50M->50M...->50M...->3M..->50M->50M streaming process receive data: 8:00~22:00 ==> 14hours * 1800 * 50M(threshold value) =126M until 22:00 queue overstock:1512000M - 126M =252000M. So the next few hours processing still 50M data per bath duration,until no overstock in file queue. If possible please see my modification of source code in https://github.com/surq/spark/commit/77ab6a619444bd4e2d40da21f441b1d918798700. Thanks! > [Streaming]limit the file Dstream size for each batch > - > > Key: SPARK-4734 > URL: https://issues.apache.org/jira/browse/SPARK-4734 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: 宿荣全 >Priority: Minor > > Streaming scan new files form the HDFS and process those files in each batch > process.Current streaming exist some problems: > 1.When the number of files is very large(the count size of those files is > very large) in some batch segement.The processing time required will become > very long.The processing time maybe over slideDuration time.Eventually lead > to dispatch the next batch process is delay. > 2.when the size of total file Dstream is very large in one batch,those > dstream data do shuffle after memory will be n times increasing > occupation,app will be slow or even terminated by operating system. > So if we set a upper limit value of input data for each batch to control the > batch process time,the job dispatch delay and the process delay wil be > alleviated. > modification: > Add a new parameter "spark.streaming.segmentSizeThreshold" in InputDStream > (input data base class).the size of each batch process segments will be set > in this parameter from [spark-defaults.conf] or setting in source. > all implements class of InputDStream will do corresponding action be aimed at > the segmentSizeThreshold. > This patch is a modification about FileInputDStream ,so when find new files >,put those files's name and size in a queue and take elements package to a > batch data with totail size < segmentSizeThreshold in > FileInputDStream.Please look source about detailed logic. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4734) [Streaming]limit the file Dstream size for each batch
[ https://issues.apache.org/jira/browse/SPARK-4734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14235493#comment-14235493 ] Sean Owen commented on SPARK-4734: -- Well, here you have the apparent problem that you receive 103M records per day, but can only process 100M per day. That will not work in the long run. Let's assume you mean you get 97M records, then 3M. I think there is a much simpler solution: make a larger batch duration. If you double the batch duration, you always handle all 100M records per day without any change. That's partly the effect of your patch anyway. The 97M records are not processed in 12 hours; 47M are delayed anyway. Really, a file-based protocol is not the right tool here. It sounds like you really, really should use Kafka or another message queue to buffer the flow of records. You're sort of reinventing a very basic queue here. > [Streaming]limit the file Dstream size for each batch > - > > Key: SPARK-4734 > URL: https://issues.apache.org/jira/browse/SPARK-4734 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: 宿荣全 >Priority: Minor > > Streaming scan new files form the HDFS and process those files in each batch > process.Current streaming exist some problems: > 1.When the number of files is very large(the count size of those files is > very large) in some batch segement.The processing time required will become > very long.The processing time maybe over slideDuration time.Eventually lead > to dispatch the next batch process is delay. > 2.when the size of total file Dstream is very large in one batch,those > dstream data do shuffle after memory will be n times increasing > occupation,app will be slow or even terminated by operating system. > So if we set a upper limit value of input data for each batch to control the > batch process time,the job dispatch delay and the process delay wil be > alleviated. > modification: > Add a new parameter "spark.streaming.segmentSizeThreshold" in InputDStream > (input data base class).the size of each batch process segments will be set > in this parameter from [spark-defaults.conf] or setting in source. > all implements class of InputDStream will do corresponding action be aimed at > the segmentSizeThreshold. > This patch is a modification about FileInputDStream ,so when find new files >,put those files's name and size in a queue and take elements package to a > batch data with totail size < segmentSizeThreshold in > FileInputDStream.Please look source about detailed logic. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4734) [Streaming]limit the file Dstream size for each batch
[ https://issues.apache.org/jira/browse/SPARK-4734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14235304#comment-14235304 ] 宿荣全 commented on SPARK-4734: [~srowen] I am very sorry that I can't describe the suggestion clearly. This suggestion is about capping the amount of data in each batch. The processing input data's ability of a cluster per batch duration is certain,but capping the amount of data in each batch is not steady per batch duration.Sometimes big sometimes small or even no input data.If we limit the max capping the amount of data in each batch that is best to the max size is near the processing ability of the cluster per batch duration.So a input-data will be processing in next several batch durations when the amount of data is very large. App possible will still processing earlier stage input data when effectively no input data in this batch duration. for example: -- per batch duration:2S -- The processing input data's ability of a cluster per batch duration: 50M -- HDFS(other system's output files): -- 8:00~22:00 : Add some files the amount of data is 100M per batch duration. -- 22:00~tomorrow 8:00 : Add some files the amount of data is 3M per batch duration. -- currient streaming : processing time far exceed batch duration,and lead to sheduling delay several hours,and final be terminated by operating system. -- this patch: keep the app processing 50M per batch duration in [8:00~22:00].Postponed the surplus input data processing.The amount of data is more than 3M in [22:00~tomorrow 8:00]. > [Streaming]limit the file Dstream size for each batch > - > > Key: SPARK-4734 > URL: https://issues.apache.org/jira/browse/SPARK-4734 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: 宿荣全 >Priority: Minor > > Streaming scan new files form the HDFS and process those files in each batch > process.Current streaming exist some problems: > 1.When the number of files is very large(the count size of those files is > very large) in some batch segement.The processing time required will become > very long.The processing time maybe over slideDuration time.Eventually lead > to dispatch the next batch process is delay. > 2.when the size of total file Dstream is very large in one batch,those > dstream data do shuffle after memory will be n times increasing > occupation,app will be slow or even terminated by operating system. > So if we set a upper limit value of input data for each batch to control the > batch process time,the job dispatch delay and the process delay wil be > alleviated. > modification: > Add a new parameter "spark.streaming.segmentSizeThreshold" in InputDStream > (input data base class).the size of each batch process segments will be set > in this parameter from [spark-defaults.conf] or setting in source. > all implements class of InputDStream will do corresponding action be aimed at > the segmentSizeThreshold. > This patch is a modification about FileInputDStream ,so when find new files >,put those files's name and size in a queue and take elements package to a > batch data with totail size < segmentSizeThreshold in > FileInputDStream.Please look source about detailed logic. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4734) [Streaming]limit the file Dstream size for each batch
[ https://issues.apache.org/jira/browse/SPARK-4734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14235102#comment-14235102 ] Apache Spark commented on SPARK-4734: - User 'surq' has created a pull request for this issue: https://github.com/apache/spark/pull/3597 > [Streaming]limit the file Dstream size for each batch > - > > Key: SPARK-4734 > URL: https://issues.apache.org/jira/browse/SPARK-4734 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: 宿荣全 >Priority: Minor > > Streaming scan new files form the HDFS and process those files in each batch > process.Current streaming exist some problems: > 1.When the number of files is very large(the count size of those files is > very large) in some batch segement.The processing time required will become > very long.The processing time maybe over slideDuration time.Eventually lead > to dispatch the next batch process is delay. > 2.when the size of total file Dstream is very large in one batch,those > dstream data do shuffle after memory will be n times increasing > occupation,app will be slow or even terminated by operating system. > So if we set a upper limit value of input data for each batch to control the > batch process time,the job dispatch delay and the process delay wil be > alleviated. > modification: > Add a new parameter "spark.streaming.segmentSizeThreshold" in InputDStream > (input data base class).the size of each batch process segments will be set > in this parameter from [spark-defaults.conf] or setting in source. > all implements class of InputDStream will do corresponding action be aimed at > the segmentSizeThreshold. > This patch is a modification about FileInputDStream ,so when find new files >,put those files's name and size in a queue and take elements package to a > batch data with totail size < segmentSizeThreshold in > FileInputDStream.Please look source about detailed logic. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4734) [Streaming]limit the file Dstream size for each batch
[ https://issues.apache.org/jira/browse/SPARK-4734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14234146#comment-14234146 ] Sean Owen commented on SPARK-4734: -- I don't quite understand this suggestion. In general, if processing time exceeds the batch duration, you simply need a longer batch duration or need to speed up your processing. Lots of small files are a problem in general, for shuffle -- although less so for a sort-based shuffle. The basic solution there is: don't design your system to put lots of tiny files on HDFS. Are you suggesting capping the amount of data in each batch? This does not solve either problem. Either you are just running more, smaller batches, or you are dropping data. In any event this amounts to a significant change in semantics. This doesn't sound likely. > [Streaming]limit the file Dstream size for each batch > - > > Key: SPARK-4734 > URL: https://issues.apache.org/jira/browse/SPARK-4734 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: 宿荣全 >Priority: Minor > > Streaming scan new files form the HDFS and process those files in each batch > process.Current streaming exist some problems: > 1.When the number of files is very large(the count size of those files is > very large) in some batch segement.The processing time required will become > very long.The processing time maybe over slideDuration time.Eventually lead > to dispatch the next batch process is delay. > 2.when the size of total file Dstream is very large in one batch,those > dstream data do shuffle after memory will be n times increasing > occupation,app will be slow or even terminated by operating system. > So if we set a upper limit value of input data for each batch to control the > batch process time,the job dispatch delay and the process delay wil be > alleviated. > modification: > Add a new parameter "spark.streaming.segmentSizeThreshold" in InputDStream > (input data base class).the size of each batch process segments will be set > in this parameter from [spark-defaults.conf] or setting in source. > all implements class of InputDStream will do corresponding action be aimed at > the segmentSizeThreshold. > This patch is a modification about FileInputDStream ,so when find new files >,put those files's name and size in a queue and take elements package to a > batch data with totail size < segmentSizeThreshold in > FileInputDStream.Please look source about detailed logic. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org