[jira] [Commented] (SPARK-4734) [Streaming]limit the file Dstream size for each batch

2014-12-05 Thread JIRA

[ 
https://issues.apache.org/jira/browse/SPARK-4734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14236519#comment-14236519
 ] 

宿荣全 commented on SPARK-4734:


[~srowen]
[~srowen]
I think that I still do not describe the suggestion clearly.
1.About the limit of receive data to run job is per batch duration not per day.
2.When the amount of receive data far exceed the streaming processing the 
amount data of the cluster in batch duration.make a larger batch duration.I 
think increasing the batch duration is pointless.
3.About threshold value:
Streaming according to the actual receiving data processing. when streaming 
receive the amount of data is less than limit value in one batch duration.when 
receive the amount of data is mroe than limit value,streaming will be process 
threshold value data.

return last example:
– per batch duration:2S
– The processing input data's ability of a cluster per batch duration: 50M
– HDFS(other system's output files):
– 8:00~22:00 : Add some files the amount of data is 60M per batch duration.
– 22:00~tomorrow 8:00 : Add some files the amount of data is 3M per batch 
duration.
the patch modification:
 [8:00~tomorrow 8:00]: The amount of receive data :
the total count of batch durations per hour: 60s/2s * 60m =1800
8:00~22:00 ==> 14hours * 1800 * 60M(input data) =1512000M
22:00~tomorrow 8:00=>10hours * 1800 * 3M(input data) =54000M
 the amount of receive data per day :1512000M + 54000M = 1566000M

this streaming patch process:
Put the fined new files's name and size input a queue.And streaming from queue 
take file property make bath input data .following down:
jobs queue:
bath duration:2s -->  2s ->n * 2s ->22:00->n * 2s--->tomorrow 8:00
receive data :60M->60M->..->3M->3M->3M->60M->...
process data:50M->50M->50M...->50M...->3M..->50M->50M

streaming process receive data:
8:00~22:00 ==> 14hours * 1800 * 50M(threshold value) =126M
until 22:00 queue overstock:1512000M - 126M =252000M. So the next few hours 
processing still 50M data per bath duration,until no overstock in file queue.

If possible please see my modification of source code in 
https://github.com/surq/spark/commit/77ab6a619444bd4e2d40da21f441b1d918798700.
Thanks!

> [Streaming]limit the file Dstream size for each batch
> -
>
> Key: SPARK-4734
> URL: https://issues.apache.org/jira/browse/SPARK-4734
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: 宿荣全
>Priority: Minor
>
> Streaming scan new files form the HDFS and process those files in each batch 
> process.Current streaming exist some problems:
> 1.When the number of files is very large(the count size of those files is 
> very large) in some batch segement.The processing time required will become 
> very long.The processing time maybe over slideDuration time.Eventually lead 
> to dispatch the next batch process is delay.
> 2.when the size of total file Dstream  is very large in one batch,those  
> dstream data do shuffle after memory will be n times increasing 
> occupation,app will be slow or even terminated by operating system.
> So if we set a upper limit value of input data for each batch to control the 
> batch process time,the job dispatch delay and the process delay wil be 
> alleviated.
> modification:
> Add a new parameter "spark.streaming.segmentSizeThreshold" in InputDStream 
> (input data base class).the size of each batch process segments  will be set 
> in this parameter from [spark-defaults.conf] or setting in source.
> all implements class of InputDStream will do corresponding action be aimed at 
> the segmentSizeThreshold.
> This patch is a modification about FileInputDStream ,so when find new files   
>,put those files's name and size in a queue and take elements package to a 
> batch data with totail size < segmentSizeThreshold  in 
> FileInputDStream.Please look source about detailed logic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4734) [Streaming]limit the file Dstream size for each batch

2014-12-05 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14235493#comment-14235493
 ] 

Sean Owen commented on SPARK-4734:
--

Well, here you have the apparent problem that you receive 103M records per day, 
but can only process 100M per day. That will not work in the long run. Let's 
assume you mean you get 97M records, then 3M.

I think there is a much simpler solution: make a larger batch duration. If you 
double the batch duration, you always handle all 100M records per day without 
any change.

That's partly the effect of your patch anyway. The 97M records are not 
processed in 12 hours; 47M are delayed anyway.

Really, a file-based protocol is not the right tool here. It sounds like you 
really, really should use Kafka or another message queue to buffer the flow of 
records. You're sort of reinventing a very basic queue here.

> [Streaming]limit the file Dstream size for each batch
> -
>
> Key: SPARK-4734
> URL: https://issues.apache.org/jira/browse/SPARK-4734
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: 宿荣全
>Priority: Minor
>
> Streaming scan new files form the HDFS and process those files in each batch 
> process.Current streaming exist some problems:
> 1.When the number of files is very large(the count size of those files is 
> very large) in some batch segement.The processing time required will become 
> very long.The processing time maybe over slideDuration time.Eventually lead 
> to dispatch the next batch process is delay.
> 2.when the size of total file Dstream  is very large in one batch,those  
> dstream data do shuffle after memory will be n times increasing 
> occupation,app will be slow or even terminated by operating system.
> So if we set a upper limit value of input data for each batch to control the 
> batch process time,the job dispatch delay and the process delay wil be 
> alleviated.
> modification:
> Add a new parameter "spark.streaming.segmentSizeThreshold" in InputDStream 
> (input data base class).the size of each batch process segments  will be set 
> in this parameter from [spark-defaults.conf] or setting in source.
> all implements class of InputDStream will do corresponding action be aimed at 
> the segmentSizeThreshold.
> This patch is a modification about FileInputDStream ,so when find new files   
>,put those files's name and size in a queue and take elements package to a 
> batch data with totail size < segmentSizeThreshold  in 
> FileInputDStream.Please look source about detailed logic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4734) [Streaming]limit the file Dstream size for each batch

2014-12-05 Thread JIRA

[ 
https://issues.apache.org/jira/browse/SPARK-4734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14235304#comment-14235304
 ] 

宿荣全 commented on SPARK-4734:


[~srowen]
I am very sorry that I can't describe the suggestion clearly.
This suggestion is about capping the amount of data in each batch.
The processing input data's ability of a cluster per batch duration is 
certain,but capping the amount of data in each batch is not steady per batch 
duration.Sometimes big sometimes small or even no input data.If we limit the 
max capping the amount of data in each batch that is best to the max size is 
near the processing ability of the cluster per batch duration.So a input-data 
will be processing in next several batch durations when the amount of data is 
very large. App possible will still processing earlier stage input data when 
effectively no input data in this batch duration.
for example:
-- per batch duration:2S
-- The processing input data's ability of a cluster per batch duration: 50M
-- HDFS(other system's output files):
-- 8:00~22:00 : Add some files the amount of data is 100M per batch duration.
-- 22:00~tomorrow 8:00 : Add some files the amount of data is 3M per batch 
duration.
-- currient streaming : 
   processing time far exceed batch duration,and lead to sheduling delay 
several hours,and final be terminated by operating system. 
-- this patch:
   keep the app processing 50M per batch duration in [8:00~22:00].Postponed the 
surplus input data processing.The amount of data is more than 3M in 
[22:00~tomorrow 8:00].

> [Streaming]limit the file Dstream size for each batch
> -
>
> Key: SPARK-4734
> URL: https://issues.apache.org/jira/browse/SPARK-4734
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: 宿荣全
>Priority: Minor
>
> Streaming scan new files form the HDFS and process those files in each batch 
> process.Current streaming exist some problems:
> 1.When the number of files is very large(the count size of those files is 
> very large) in some batch segement.The processing time required will become 
> very long.The processing time maybe over slideDuration time.Eventually lead 
> to dispatch the next batch process is delay.
> 2.when the size of total file Dstream  is very large in one batch,those  
> dstream data do shuffle after memory will be n times increasing 
> occupation,app will be slow or even terminated by operating system.
> So if we set a upper limit value of input data for each batch to control the 
> batch process time,the job dispatch delay and the process delay wil be 
> alleviated.
> modification:
> Add a new parameter "spark.streaming.segmentSizeThreshold" in InputDStream 
> (input data base class).the size of each batch process segments  will be set 
> in this parameter from [spark-defaults.conf] or setting in source.
> all implements class of InputDStream will do corresponding action be aimed at 
> the segmentSizeThreshold.
> This patch is a modification about FileInputDStream ,so when find new files   
>,put those files's name and size in a queue and take elements package to a 
> batch data with totail size < segmentSizeThreshold  in 
> FileInputDStream.Please look source about detailed logic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4734) [Streaming]limit the file Dstream size for each batch

2014-12-04 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14235102#comment-14235102
 ] 

Apache Spark commented on SPARK-4734:
-

User 'surq' has created a pull request for this issue:
https://github.com/apache/spark/pull/3597

> [Streaming]limit the file Dstream size for each batch
> -
>
> Key: SPARK-4734
> URL: https://issues.apache.org/jira/browse/SPARK-4734
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: 宿荣全
>Priority: Minor
>
> Streaming scan new files form the HDFS and process those files in each batch 
> process.Current streaming exist some problems:
> 1.When the number of files is very large(the count size of those files is 
> very large) in some batch segement.The processing time required will become 
> very long.The processing time maybe over slideDuration time.Eventually lead 
> to dispatch the next batch process is delay.
> 2.when the size of total file Dstream  is very large in one batch,those  
> dstream data do shuffle after memory will be n times increasing 
> occupation,app will be slow or even terminated by operating system.
> So if we set a upper limit value of input data for each batch to control the 
> batch process time,the job dispatch delay and the process delay wil be 
> alleviated.
> modification:
> Add a new parameter "spark.streaming.segmentSizeThreshold" in InputDStream 
> (input data base class).the size of each batch process segments  will be set 
> in this parameter from [spark-defaults.conf] or setting in source.
> all implements class of InputDStream will do corresponding action be aimed at 
> the segmentSizeThreshold.
> This patch is a modification about FileInputDStream ,so when find new files   
>,put those files's name and size in a queue and take elements package to a 
> batch data with totail size < segmentSizeThreshold  in 
> FileInputDStream.Please look source about detailed logic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4734) [Streaming]limit the file Dstream size for each batch

2014-12-04 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14234146#comment-14234146
 ] 

Sean Owen commented on SPARK-4734:
--

I don't quite understand this suggestion. In general, if processing time 
exceeds the batch duration, you simply need a longer batch duration or need to 
speed up your processing. Lots of small files are a problem in general, for 
shuffle -- although less so for a sort-based shuffle.

The basic solution there is: don't design your system to put lots of tiny files 
on HDFS.

Are you suggesting capping the amount of data in each batch? This does not 
solve either problem. Either you are just running more, smaller batches, or you 
are dropping data. In any event this amounts to a significant change in 
semantics. This doesn't sound likely.

> [Streaming]limit the file Dstream size for each batch
> -
>
> Key: SPARK-4734
> URL: https://issues.apache.org/jira/browse/SPARK-4734
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: 宿荣全
>Priority: Minor
>
> Streaming scan new files form the HDFS and process those files in each batch 
> process.Current streaming exist some problems:
> 1.When the number of files is very large(the count size of those files is 
> very large) in some batch segement.The processing time required will become 
> very long.The processing time maybe over slideDuration time.Eventually lead 
> to dispatch the next batch process is delay.
> 2.when the size of total file Dstream  is very large in one batch,those  
> dstream data do shuffle after memory will be n times increasing 
> occupation,app will be slow or even terminated by operating system.
> So if we set a upper limit value of input data for each batch to control the 
> batch process time,the job dispatch delay and the process delay wil be 
> alleviated.
> modification:
> Add a new parameter "spark.streaming.segmentSizeThreshold" in InputDStream 
> (input data base class).the size of each batch process segments  will be set 
> in this parameter from [spark-defaults.conf] or setting in source.
> all implements class of InputDStream will do corresponding action be aimed at 
> the segmentSizeThreshold.
> This patch is a modification about FileInputDStream ,so when find new files   
>,put those files's name and size in a queue and take elements package to a 
> batch data with totail size < segmentSizeThreshold  in 
> FileInputDStream.Please look source about detailed logic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org